Skip to content

Commit

Permalink
Move rate limiting and service name restrictions out of processor/str…
Browse files Browse the repository at this point in the history
…eam (elastic#5492)

* Move rate-limiting out of processor/stream

... and into beater/api/intake, injected into
the stream processor as a batch processor. This
paves the way for taking a similar approach for
otel.

* processor/stream: unexport some things

Unexport some things that are no longer needed for tests.

* Move allowed service names out of processor/stream

... and into beater/api, injected into the stream
processor as a batch processor. This paves the way
for taking a similar approach for otel.

* beater/api/intake: revise rate-limiting

Rate limit after reading but before processing.
Simpler and in line with what we will do for OTel.

* Remove stale comment

* Reinstate initial Allow

* beater/api/intake: add missing return
  • Loading branch information
axw authored and stuartnelson3 committed Jun 22, 2021
1 parent db22ff0 commit 778d0e9
Show file tree
Hide file tree
Showing 12 changed files with 246 additions and 245 deletions.
59 changes: 47 additions & 12 deletions beater/api/intake/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"io"
"net/http"
"strings"
"time"

"golang.org/x/time/rate"

Expand All @@ -33,10 +34,16 @@ import (
"github.com/elastic/apm-server/beater/request"
"github.com/elastic/apm-server/decoder"
"github.com/elastic/apm-server/model"
"github.com/elastic/apm-server/model/modelprocessor"
"github.com/elastic/apm-server/processor/stream"
"github.com/elastic/apm-server/publish"
)

const (
batchSize = 10
rateLimitTimeout = time.Second
)

var (
// MonitoringMap holds a mapping for request.IDs to monitoring counters
MonitoringMap = request.DefaultMonitoringMapForRegistry(registry)
Expand All @@ -45,34 +52,45 @@ var (
errMethodNotAllowed = errors.New("only POST requests are supported")
errServerShuttingDown = errors.New("server is shutting down")
errInvalidContentType = errors.New("invalid content type")
errRateLimitExceeded = stream.ErrRateLimitExceeded
errRateLimitExceeded = errors.New("rate limit exceeded")
)

// StreamHandler is an interface for handling an Elastic APM agent ND-JSON event
// stream, implemented by processor/stream.
type StreamHandler interface {
HandleStream(
context.Context,
*rate.Limiter,
*model.Metadata,
io.Reader,
model.BatchProcessor,
*stream.Result,
ctx context.Context,
meta *model.Metadata,
stream io.Reader,
batchSize int,
processor model.BatchProcessor,
out *stream.Result,
) error
}

// Handler returns a request.Handler for managing intake requests for backend and rum events.
func Handler(handler StreamHandler, batchProcessor model.BatchProcessor) request.Handler {
return func(c *request.Context) {

if err := validateRequest(c); err != nil {
writeError(c, err)
return
}

if c.RateLimiter != nil && !c.RateLimiter.Allow() {
writeError(c, errRateLimitExceeded)
return
if c.RateLimiter != nil {
// Call Allow once for each stream to avoid burning CPU before we
// begin reading for the first time. This prevents clients from
// repeatedly connecting and sending < batchSize events and
// disconnecting before being rate limited.
if !c.RateLimiter.Allow() {
writeError(c, errRateLimitExceeded)
return
}

// Apply rate limiting after reading but before processing any events.
batchProcessor = modelprocessor.Chained{
rateLimitBatchProcessor(c.RateLimiter, batchSize),
batchProcessor,
}
}

reader, err := decoder.CompressedRequestReader(c.Request)
Expand All @@ -90,9 +108,9 @@ func Handler(handler StreamHandler, batchProcessor model.BatchProcessor) request
var result stream.Result
if err := handler.HandleStream(
c.Request.Context(),
c.RateLimiter,
&metadata,
reader,
batchSize,
batchProcessor,
&result,
); err != nil {
Expand All @@ -102,6 +120,23 @@ func Handler(handler StreamHandler, batchProcessor model.BatchProcessor) request
}
}

func rateLimitBatchProcessor(limiter *rate.Limiter, batchSize int) model.ProcessBatchFunc {
return func(ctx context.Context, _ *model.Batch) error {
return rateLimitBatch(ctx, limiter, batchSize)
}
}

// rateLimitBatch waits up to one second for the rate limiter to allow
// batchSize events to be read and processed.
func rateLimitBatch(ctx context.Context, limiter *rate.Limiter, batchSize int) error {
ctx, cancel := context.WithTimeout(ctx, rateLimitTimeout)
defer cancel()
if err := limiter.WaitN(ctx, batchSize); err != nil {
return errRateLimitExceeded
}
return nil
}

func validateRequest(c *request.Context) error {
if c.Request.Method != http.MethodPost {
return errMethodNotAllowed
Expand Down
80 changes: 64 additions & 16 deletions beater/api/intake/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,24 +27,23 @@ import (
"net/http/httptest"
"path/filepath"
"testing"

"github.com/elastic/apm-server/approvaltest"
"github.com/elastic/apm-server/beater/api/ratelimit"
"github.com/elastic/apm-server/model"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/time/rate"

"github.com/elastic/apm-server/approvaltest"
"github.com/elastic/apm-server/beater/config"
"github.com/elastic/apm-server/beater/headers"
"github.com/elastic/apm-server/beater/request"
"github.com/elastic/apm-server/model"
"github.com/elastic/apm-server/model/modelprocessor"
"github.com/elastic/apm-server/processor/stream"
"github.com/elastic/apm-server/publish"
)

func TestIntakeHandler(t *testing.T) {
var rateLimit, err = ratelimit.NewStore(1, 0, 0)
require.NoError(t, err)
for name, tc := range map[string]testcaseIntakeHandler{
"Method": {
path: "errors.ndjson",
Expand All @@ -60,11 +59,6 @@ func TestIntakeHandler(t *testing.T) {
}(),
code: http.StatusBadRequest, id: request.IDResponseErrorsValidate,
},
"RateLimit": {
path: "errors.ndjson",
rateLimit: rateLimit,
code: http.StatusTooManyRequests, id: request.IDResponseErrorsRateLimit,
},
"BodyReader": {
path: "errors.ndjson",
r: func() *http.Request {
Expand Down Expand Up @@ -141,9 +135,6 @@ func TestIntakeHandler(t *testing.T) {
// setup
tc.setup(t)

if tc.rateLimit != nil {
tc.c.RateLimiter = tc.rateLimit.ForIP(&http.Request{})
}
// call handler
h := Handler(tc.processor, tc.batchProcessor)
h(tc.c)
Expand All @@ -164,12 +155,69 @@ func TestIntakeHandler(t *testing.T) {
}
}

func TestRateLimiting(t *testing.T) {
type test struct {
limiter *rate.Limiter
preconsumed int
expectLimited bool
}

for name, test := range map[string]test{
"LimiterAllowAll": {
limiter: rate.NewLimiter(rate.Limit(40), 40*5),
expectLimited: false,
},
"LimiterPartiallyUsedLimitAllow": {
limiter: rate.NewLimiter(rate.Limit(10), 10*2),
preconsumed: 10,
expectLimited: false,
},
"LimiterDenyAll": {
limiter: rate.NewLimiter(rate.Limit(0), 2),
expectLimited: true,
},
"LimiterPartiallyUsedLimitDeny": {
limiter: rate.NewLimiter(rate.Limit(7), 7*2),
preconsumed: 10,
expectLimited: true,
},
"LimiterDeny": {
limiter: rate.NewLimiter(rate.Limit(6), 6*2),
expectLimited: true,
},
} {
t.Run(name, func(t *testing.T) {
var tc testcaseIntakeHandler
tc.path = "ratelimit.ndjson"
tc.setup(t)
tc.c.RateLimiter = test.limiter
if test.preconsumed > 0 {
test.limiter.AllowN(time.Now(), test.preconsumed)
}

h := Handler(tc.processor, tc.batchProcessor)
h(tc.c)

if test.expectLimited {
assert.Equal(t, request.IDResponseErrorsRateLimit, tc.c.Result.ID)
assert.Equal(t, http.StatusTooManyRequests, tc.w.Code)
assert.Error(t, tc.c.Result.Err)
} else {
assert.Equal(t, request.IDResponseValidAccepted, tc.c.Result.ID)
assert.Equal(t, http.StatusAccepted, tc.w.Code)
assert.NoError(t, tc.c.Result.Err)
}
assert.NotZero(t, tc.w.Body.Len())
approvaltest.ApproveJSON(t, "test_approved/"+t.Name(), tc.w.Body.Bytes())
})
}
}

type testcaseIntakeHandler struct {
c *request.Context
w *httptest.ResponseRecorder
r *http.Request
processor *stream.Processor
rateLimit *ratelimit.Store
batchProcessor model.BatchProcessor
path string

Expand All @@ -183,7 +231,7 @@ func (tc *testcaseIntakeHandler) setup(t *testing.T) {
tc.processor = stream.BackendProcessor(cfg)
}
if tc.batchProcessor == nil {
tc.batchProcessor = model.ProcessBatchFunc(func(context.Context, *model.Batch) error { return nil })
tc.batchProcessor = modelprocessor.Nop{}
}

if tc.r == nil {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"accepted": 19
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"accepted": 10,
"errors": [
{
"message": "rate limit exceeded"
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"accepted": 19
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"accepted": 10,
"errors": [
{
"message": "rate limit exceeded"
}
]
}
43 changes: 33 additions & 10 deletions beater/api/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package api

import (
"context"
"net/http"
"net/http/pprof"

Expand All @@ -37,6 +38,7 @@ import (
"github.com/elastic/apm-server/beater/request"
logs "github.com/elastic/apm-server/log"
"github.com/elastic/apm-server/model"
"github.com/elastic/apm-server/model/modelprocessor"
"github.com/elastic/apm-server/processor/stream"
"github.com/elastic/apm-server/publish"
)
Expand Down Expand Up @@ -100,8 +102,8 @@ func NewMux(
{AssetSourcemapPath, builder.sourcemapHandler},
{AgentConfigPath, builder.backendAgentConfigHandler(fetcher)},
{AgentConfigRUMPath, builder.rumAgentConfigHandler(fetcher)},
{IntakeRUMPath, builder.rumIntakeHandler},
{IntakeRUMV3Path, builder.rumV3IntakeHandler},
{IntakeRUMPath, builder.rumIntakeHandler(stream.RUMV2Processor)},
{IntakeRUMV3Path, builder.rumIntakeHandler(stream.RUMV3Processor)},
{IntakePath, builder.backendIntakeHandler},
// The profile endpoint is in Beta
{ProfilePath, builder.profileHandler},
Expand Down Expand Up @@ -152,14 +154,13 @@ func (r *routeBuilder) backendIntakeHandler() (request.Handler, error) {
return middleware.Wrap(h, backendMiddleware(r.cfg, authHandler, intake.MonitoringMap)...)
}

func (r *routeBuilder) rumIntakeHandler() (request.Handler, error) {
h := intake.Handler(stream.RUMV2Processor(r.cfg), r.batchProcessor)
return middleware.Wrap(h, rumMiddleware(r.cfg, nil, intake.MonitoringMap)...)
}

func (r *routeBuilder) rumV3IntakeHandler() (request.Handler, error) {
h := intake.Handler(stream.RUMV3Processor(r.cfg), r.batchProcessor)
return middleware.Wrap(h, rumMiddleware(r.cfg, nil, intake.MonitoringMap)...)
func (r *routeBuilder) rumIntakeHandler(newProcessor func(*config.Config) *stream.Processor) func() (request.Handler, error) {
return func() (request.Handler, error) {
batchProcessor := r.batchProcessor
batchProcessor = batchProcessorWithAllowedServiceNames(batchProcessor, r.cfg.RumConfig.AllowServiceNames)
h := intake.Handler(newProcessor(r.cfg), batchProcessor)
return middleware.Wrap(h, rumMiddleware(r.cfg, nil, intake.MonitoringMap)...)
}
}

func (r *routeBuilder) sourcemapHandler() (request.Handler, error) {
Expand Down Expand Up @@ -265,3 +266,25 @@ func rootMiddleware(cfg *config.Config, auth *authorization.Handler) []middlewar
middleware.ResponseHeadersMiddleware(cfg.ResponseHeaders),
middleware.AuthorizationMiddleware(auth, false))
}

// TODO(axw) move this into the authorization package when introducing anonymous auth.
func batchProcessorWithAllowedServiceNames(p model.BatchProcessor, allowedServiceNames []string) model.BatchProcessor {
if len(allowedServiceNames) == 0 {
// All service names are allowed.
return p
}
m := make(map[string]bool)
for _, name := range allowedServiceNames {
m[name] = true
}
var restrictServiceName modelprocessor.MetadataProcessorFunc = func(ctx context.Context, meta *model.Metadata) error {
// Restrict to explicitly allowed service names.
// The list of allowed service names is not considered secret,
// so we do not use constant time comparison.
if !m[meta.Service.Name] {
return &stream.InvalidInputError{Message: "service name is not allowed"}
}
return nil
}
return modelprocessor.Chained{restrictServiceName, p}
}
35 changes: 35 additions & 0 deletions beater/api/mux_intake_rum_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package api

import (
"bytes"
"io/ioutil"
"net/http"
"net/http/httptest"
"testing"
Expand Down Expand Up @@ -121,6 +123,39 @@ func TestRumHandler_MonitoringMiddleware(t *testing.T) {
}
}

func TestRUMHandler_AllowedServiceNames(t *testing.T) {
payload, err := ioutil.ReadFile("../../testdata/intake-v2/transactions_spans_rum.ndjson")
require.NoError(t, err)

for _, test := range []struct {
AllowServiceNames []string
Allowed bool
}{{
AllowServiceNames: nil,
Allowed: true, // none specified = all allowed
}, {
AllowServiceNames: []string{"apm-agent-js"}, // matches what's in test data
Allowed: true,
}, {
AllowServiceNames: []string{"reject_everything"},
Allowed: false,
}} {
cfg := cfgEnabledRUM()
cfg.RumConfig.AllowServiceNames = test.AllowServiceNames
h := newTestMux(t, cfg)

req := httptest.NewRequest(http.MethodPost, "/intake/v2/rum/events", bytes.NewReader(payload))
req.Header.Set("Content-Type", "application/x-ndjson")
w := httptest.NewRecorder()
h.ServeHTTP(w, req)
if test.Allowed {
assert.Equal(t, http.StatusAccepted, w.Code)
} else {
assert.Equal(t, http.StatusBadRequest, w.Code)
}
}
}

func cfgEnabledRUM() *config.Config {
cfg := config.DefaultConfig()
cfg.RumConfig.Enabled = true
Expand Down
Loading

0 comments on commit 778d0e9

Please sign in to comment.