From c613464048ce4a9f189851a9c2b548e42f4b9244 Mon Sep 17 00:00:00 2001 From: Andrew Wilkins Date: Wed, 23 Jun 2021 16:37:28 +0800 Subject: [PATCH] beater/api/intake: fix rate limiting (#5518) * beater/api/intake: fix rate limiting Copy the batch processor in the closure before copying, to avoid wrapping it multiple times. * systemtest: fix rate limiting test The test was wrong, and only passed because of the bug that has been fixed. We should be using up the per-IP event limits on the first two requests, but weren't taking into account the burst multiplier (limit x 3). (cherry picked from commit 97a6f9b86f2c30493696e2f9d10e1236accbe672) --- beater/api/intake/handler.go | 2 ++ beater/api/intake/handler_test.go | 26 ++++++++++++++++++++++++++ systemtest/rum_test.go | 6 ++---- 3 files changed, 30 insertions(+), 4 deletions(-) diff --git a/beater/api/intake/handler.go b/beater/api/intake/handler.go index 3d9a4d54937..6a29d24abfe 100644 --- a/beater/api/intake/handler.go +++ b/beater/api/intake/handler.go @@ -81,6 +81,8 @@ func Handler(handler StreamHandler, requestMetadataFunc RequestMetadataFunc, bat return } + // copy batchProcessor to avoid updating closure below + batchProcessor := batchProcessor if limiter, ok := ratelimit.FromContext(c.Request.Context()); ok { // Apply rate limiting after reading but before processing any events. batchProcessor = modelprocessor.Chained{ diff --git a/beater/api/intake/handler_test.go b/beater/api/intake/handler_test.go index ed4074520f4..1c13b1fb88e 100644 --- a/beater/api/intake/handler_test.go +++ b/beater/api/intake/handler_test.go @@ -217,6 +217,32 @@ func TestRateLimiting(t *testing.T) { } } +func TestRateLimitingRequests(t *testing.T) { + // Check that rate limiting across multiple requests is handled correctly. + // + // ratelimit.ndjson contains 19 events, and we rate limit in batches of 10 + // events. The burst of 41 should be enough for 2 iterations with one left. + limiter := rate.NewLimiter(1, 41) + processor := stream.BackendProcessor(config.DefaultConfig()) + handler := Handler(processor, emptyRequestMetadata, modelprocessor.Nop{}) + + data, err := ioutil.ReadFile("../../../testdata/intake-v2/ratelimit.ndjson") + require.NoError(t, err) + for i := 0; i < 2; i++ { + r := httptest.NewRequest("POST", "/", bytes.NewBuffer(data)) + r = r.WithContext(ratelimit.ContextWithLimiter(r.Context(), limiter)) + r.Header.Add("Content-Type", "application/x-ndjson") + + w := httptest.NewRecorder() + c := request.NewContext() + c.Reset(w, r) + handler(c) + assert.Equal(t, http.StatusAccepted, w.Code) + } + assert.True(t, limiter.Allow()) + assert.False(t, limiter.Allow()) +} + type testcaseIntakeHandler struct { c *request.Context w *httptest.ResponseRecorder diff --git a/systemtest/rum_test.go b/systemtest/rum_test.go index 4d82b0a71ef..59a4ef03764 100644 --- a/systemtest/rum_test.go +++ b/systemtest/rum_test.go @@ -184,13 +184,11 @@ func TestRUMRateLimit(t *testing.T) { // Just check that rate limiting is wired up. More specific rate limiting scenarios are unit tested. var g errgroup.Group - g.Go(func() error { return sendEvents("10.11.12.13", srv.Config.RUM.RateLimit.EventLimit) }) - g.Go(func() error { return sendEvents("10.11.12.14", srv.Config.RUM.RateLimit.EventLimit) }) + g.Go(func() error { return sendEvents("10.11.12.13", srv.Config.RUM.RateLimit.EventLimit*3) }) + g.Go(func() error { return sendEvents("10.11.12.14", srv.Config.RUM.RateLimit.EventLimit*3) }) assert.NoError(t, g.Wait()) g = errgroup.Group{} - g.Go(func() error { return sendEvents("10.11.12.13", srv.Config.RUM.RateLimit.EventLimit) }) - g.Go(func() error { return sendEvents("10.11.12.14", srv.Config.RUM.RateLimit.EventLimit) }) g.Go(func() error { return sendEvents("10.11.12.15", srv.Config.RUM.RateLimit.EventLimit) }) err = g.Wait() require.Error(t, err)