Skip to content

Commit

Permalink
beater/api/intake: fix rate limiting (#5518)
Browse files Browse the repository at this point in the history
* beater/api/intake: fix rate limiting

Copy the batch processor in the closure before
copying, to avoid wrapping it multiple times.

* systemtest: fix rate limiting test

The test was wrong, and only passed because of
the bug that has been fixed. We should be using
up the per-IP event limits on the first two
requests, but weren't taking into account the
burst multiplier (limit x 3).

(cherry picked from commit 97a6f9b)

# Conflicts:
#	beater/api/intake/handler.go
#	beater/api/intake/handler_test.go
#	systemtest/rum_test.go
  • Loading branch information
axw authored and mergify-bot committed Jul 9, 2021
1 parent e369a0e commit f96efa4
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 0 deletions.
13 changes: 13 additions & 0 deletions beater/api/intake/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,19 @@ func Handler(handler StreamHandler, requestMetadataFunc RequestMetadataFunc, bat
return
}

<<<<<<< HEAD
=======
// copy batchProcessor to avoid updating closure below
batchProcessor := batchProcessor
if limiter, ok := ratelimit.FromContext(c.Request.Context()); ok {
// Apply rate limiting after reading but before processing any events.
batchProcessor = modelprocessor.Chained{
rateLimitBatchProcessor(limiter, batchSize),
batchProcessor,
}
}

>>>>>>> 97a6f9b8 (beater/api/intake: fix rate limiting (#5518))
reader, err := decoder.CompressedRequestReader(c.Request)
if err != nil {
writeError(c, compressedRequestReaderError{err})
Expand Down
90 changes: 90 additions & 0 deletions beater/api/intake/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,96 @@ func TestIntakeHandler(t *testing.T) {
}
}

<<<<<<< HEAD
=======
func TestRateLimiting(t *testing.T) {
type test struct {
limiter *rate.Limiter
preconsumed int
expectLimited bool
}

for name, test := range map[string]test{
"LimiterAllowAll": {
limiter: rate.NewLimiter(rate.Limit(40), 40*5),
expectLimited: false,
},
"LimiterPartiallyUsedLimitAllow": {
limiter: rate.NewLimiter(rate.Limit(10), 10*2),
preconsumed: 10,
expectLimited: false,
},
"LimiterDenyAll": {
limiter: rate.NewLimiter(rate.Limit(0), 2),
expectLimited: true,
},
"LimiterPartiallyUsedLimitDeny": {
limiter: rate.NewLimiter(rate.Limit(7), 7*2),
preconsumed: 10,
expectLimited: true,
},
"LimiterDeny": {
limiter: rate.NewLimiter(rate.Limit(6), 6*2),
expectLimited: true,
},
} {
t.Run(name, func(t *testing.T) {
var tc testcaseIntakeHandler
tc.path = "ratelimit.ndjson"
tc.setup(t)

tc.c.Request = tc.c.Request.WithContext(
ratelimit.ContextWithLimiter(tc.c.Request.Context(), test.limiter),
)
if test.preconsumed > 0 {
test.limiter.AllowN(time.Now(), test.preconsumed)
}

h := Handler(tc.processor, emptyRequestMetadata, tc.batchProcessor)
h(tc.c)

if test.expectLimited {
assert.Equal(t, request.IDResponseErrorsRateLimit, tc.c.Result.ID)
assert.Equal(t, http.StatusTooManyRequests, tc.w.Code)
assert.Error(t, tc.c.Result.Err)
} else {
assert.Equal(t, request.IDResponseValidAccepted, tc.c.Result.ID)
assert.Equal(t, http.StatusAccepted, tc.w.Code)
assert.NoError(t, tc.c.Result.Err)
}
assert.NotZero(t, tc.w.Body.Len())
approvaltest.ApproveJSON(t, "test_approved/"+t.Name(), tc.w.Body.Bytes())
})
}
}

func TestRateLimitingRequests(t *testing.T) {
// Check that rate limiting across multiple requests is handled correctly.
//
// ratelimit.ndjson contains 19 events, and we rate limit in batches of 10
// events. The burst of 41 should be enough for 2 iterations with one left.
limiter := rate.NewLimiter(1, 41)
processor := stream.BackendProcessor(config.DefaultConfig())
handler := Handler(processor, emptyRequestMetadata, modelprocessor.Nop{})

data, err := ioutil.ReadFile("../../../testdata/intake-v2/ratelimit.ndjson")
require.NoError(t, err)
for i := 0; i < 2; i++ {
r := httptest.NewRequest("POST", "/", bytes.NewBuffer(data))
r = r.WithContext(ratelimit.ContextWithLimiter(r.Context(), limiter))
r.Header.Add("Content-Type", "application/x-ndjson")

w := httptest.NewRecorder()
c := request.NewContext()
c.Reset(w, r)
handler(c)
assert.Equal(t, http.StatusAccepted, w.Code)
}
assert.True(t, limiter.Allow())
assert.False(t, limiter.Allow())
}

>>>>>>> 97a6f9b8 (beater/api/intake: fix rate limiting (#5518))
type testcaseIntakeHandler struct {
c *request.Context
w *httptest.ResponseRecorder
Expand Down
11 changes: 11 additions & 0 deletions systemtest/rum_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ func TestRUMRateLimit(t *testing.T) {
err = sendEvents("10.11.12.13", 3*srv.Config.RUM.RateLimit.EventLimit)
assert.NoError(t, err)

<<<<<<< HEAD
// Sending the events over multiple requests should have the same outcome.
for i := 0; i < 3; i++ {
err = sendEvents("10.11.12.14", srv.Config.RUM.RateLimit.EventLimit)
Expand All @@ -171,6 +172,16 @@ func TestRUMRateLimit(t *testing.T) {
// The rate limiter cache only has space for 2 IPs, so the 3rd one reuses an existing
// limiter, which will have already been exhausted.
err = sendEvents("10.11.12.15", 10)
=======
var g errgroup.Group
g.Go(func() error { return sendEvents("10.11.12.13", srv.Config.RUM.RateLimit.EventLimit*3) })
g.Go(func() error { return sendEvents("10.11.12.14", srv.Config.RUM.RateLimit.EventLimit*3) })
assert.NoError(t, g.Wait())

g = errgroup.Group{}
g.Go(func() error { return sendEvents("10.11.12.15", srv.Config.RUM.RateLimit.EventLimit) })
err = g.Wait()
>>>>>>> 97a6f9b8 (beater/api/intake: fix rate limiting (#5518))
require.Error(t, err)

// The exact error differs, depending on whether rate limiting was applied at the request
Expand Down

0 comments on commit f96efa4

Please sign in to comment.