From f5664d8196599758da0dd1e1c88771d24e0f4870 Mon Sep 17 00:00:00 2001 From: Andrew Wilkins Date: Sat, 19 Jun 2021 10:58:03 +0800 Subject: [PATCH 1/5] Move rate limiting and service name restrictions out of processor/stream (#5492) * Move rate-limiting out of processor/stream ... and into beater/api/intake, injected into the stream processor as a batch processor. This paves the way for taking a similar approach for otel. * processor/stream: unexport some things Unexport some things that are no longer needed for tests. * Move allowed service names out of processor/stream ... and into beater/api, injected into the stream processor as a batch processor. This paves the way for taking a similar approach for otel. * beater/api/intake: revise rate-limiting Rate limit after reading but before processing. Simpler and in line with what we will do for OTel. * Remove stale comment * Reinstate initial Allow * beater/api/intake: add missing return --- beater/api/intake/handler.go | 59 ++++++++-- beater/api/intake/handler_test.go | 80 ++++++++++--- .../LimiterAllowAll.approved.json | 3 + .../LimiterDeny.approved.json | 8 ++ .../LimiterDenyAll.approved.json} | 0 ...miterPartiallyUsedLimitAllow.approved.json | 3 + ...imiterPartiallyUsedLimitDeny.approved.json | 8 ++ beater/api/mux.go | 43 +++++-- beater/api/mux_intake_rum_test.go | 35 ++++++ processor/stream/benchmark_test.go | 43 +++---- processor/stream/processor.go | 102 ++++------------- processor/stream/processor_test.go | 107 +----------------- 12 files changed, 246 insertions(+), 245 deletions(-) create mode 100644 beater/api/intake/test_approved/TestRateLimiting/LimiterAllowAll.approved.json create mode 100644 beater/api/intake/test_approved/TestRateLimiting/LimiterDeny.approved.json rename beater/api/intake/test_approved/{RateLimit.approved.json => TestRateLimiting/LimiterDenyAll.approved.json} (100%) create mode 100644 beater/api/intake/test_approved/TestRateLimiting/LimiterPartiallyUsedLimitAllow.approved.json create mode 100644 beater/api/intake/test_approved/TestRateLimiting/LimiterPartiallyUsedLimitDeny.approved.json diff --git a/beater/api/intake/handler.go b/beater/api/intake/handler.go index 1bffb4421a8..2bfb47b7037 100644 --- a/beater/api/intake/handler.go +++ b/beater/api/intake/handler.go @@ -24,6 +24,7 @@ import ( "io" "net/http" "strings" + "time" "golang.org/x/time/rate" @@ -33,10 +34,16 @@ import ( "github.com/elastic/apm-server/beater/request" "github.com/elastic/apm-server/decoder" "github.com/elastic/apm-server/model" + "github.com/elastic/apm-server/model/modelprocessor" "github.com/elastic/apm-server/processor/stream" "github.com/elastic/apm-server/publish" ) +const ( + batchSize = 10 + rateLimitTimeout = time.Second +) + var ( // MonitoringMap holds a mapping for request.IDs to monitoring counters MonitoringMap = request.DefaultMonitoringMapForRegistry(registry) @@ -45,34 +52,45 @@ var ( errMethodNotAllowed = errors.New("only POST requests are supported") errServerShuttingDown = errors.New("server is shutting down") errInvalidContentType = errors.New("invalid content type") - errRateLimitExceeded = stream.ErrRateLimitExceeded + errRateLimitExceeded = errors.New("rate limit exceeded") ) // StreamHandler is an interface for handling an Elastic APM agent ND-JSON event // stream, implemented by processor/stream. type StreamHandler interface { HandleStream( - context.Context, - *rate.Limiter, - *model.Metadata, - io.Reader, - model.BatchProcessor, - *stream.Result, + ctx context.Context, + meta *model.Metadata, + stream io.Reader, + batchSize int, + processor model.BatchProcessor, + out *stream.Result, ) error } // Handler returns a request.Handler for managing intake requests for backend and rum events. func Handler(handler StreamHandler, batchProcessor model.BatchProcessor) request.Handler { return func(c *request.Context) { - if err := validateRequest(c); err != nil { writeError(c, err) return } - if c.RateLimiter != nil && !c.RateLimiter.Allow() { - writeError(c, errRateLimitExceeded) - return + if c.RateLimiter != nil { + // Call Allow once for each stream to avoid burning CPU before we + // begin reading for the first time. This prevents clients from + // repeatedly connecting and sending < batchSize events and + // disconnecting before being rate limited. + if !c.RateLimiter.Allow() { + writeError(c, errRateLimitExceeded) + return + } + + // Apply rate limiting after reading but before processing any events. + batchProcessor = modelprocessor.Chained{ + rateLimitBatchProcessor(c.RateLimiter, batchSize), + batchProcessor, + } } reader, err := decoder.CompressedRequestReader(c.Request) @@ -90,9 +108,9 @@ func Handler(handler StreamHandler, batchProcessor model.BatchProcessor) request var result stream.Result if err := handler.HandleStream( c.Request.Context(), - c.RateLimiter, &metadata, reader, + batchSize, batchProcessor, &result, ); err != nil { @@ -102,6 +120,23 @@ func Handler(handler StreamHandler, batchProcessor model.BatchProcessor) request } } +func rateLimitBatchProcessor(limiter *rate.Limiter, batchSize int) model.ProcessBatchFunc { + return func(ctx context.Context, _ *model.Batch) error { + return rateLimitBatch(ctx, limiter, batchSize) + } +} + +// rateLimitBatch waits up to one second for the rate limiter to allow +// batchSize events to be read and processed. +func rateLimitBatch(ctx context.Context, limiter *rate.Limiter, batchSize int) error { + ctx, cancel := context.WithTimeout(ctx, rateLimitTimeout) + defer cancel() + if err := limiter.WaitN(ctx, batchSize); err != nil { + return errRateLimitExceeded + } + return nil +} + func validateRequest(c *request.Context) error { if c.Request.Method != http.MethodPost { return errMethodNotAllowed diff --git a/beater/api/intake/handler_test.go b/beater/api/intake/handler_test.go index 61923c0bf92..5a4b7cf1b4d 100644 --- a/beater/api/intake/handler_test.go +++ b/beater/api/intake/handler_test.go @@ -27,24 +27,23 @@ import ( "net/http/httptest" "path/filepath" "testing" - - "github.com/elastic/apm-server/approvaltest" - "github.com/elastic/apm-server/beater/api/ratelimit" - "github.com/elastic/apm-server/model" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "golang.org/x/time/rate" + "github.com/elastic/apm-server/approvaltest" "github.com/elastic/apm-server/beater/config" "github.com/elastic/apm-server/beater/headers" "github.com/elastic/apm-server/beater/request" + "github.com/elastic/apm-server/model" + "github.com/elastic/apm-server/model/modelprocessor" "github.com/elastic/apm-server/processor/stream" "github.com/elastic/apm-server/publish" ) func TestIntakeHandler(t *testing.T) { - var rateLimit, err = ratelimit.NewStore(1, 0, 0) - require.NoError(t, err) for name, tc := range map[string]testcaseIntakeHandler{ "Method": { path: "errors.ndjson", @@ -60,11 +59,6 @@ func TestIntakeHandler(t *testing.T) { }(), code: http.StatusBadRequest, id: request.IDResponseErrorsValidate, }, - "RateLimit": { - path: "errors.ndjson", - rateLimit: rateLimit, - code: http.StatusTooManyRequests, id: request.IDResponseErrorsRateLimit, - }, "BodyReader": { path: "errors.ndjson", r: func() *http.Request { @@ -141,9 +135,6 @@ func TestIntakeHandler(t *testing.T) { // setup tc.setup(t) - if tc.rateLimit != nil { - tc.c.RateLimiter = tc.rateLimit.ForIP(&http.Request{}) - } // call handler h := Handler(tc.processor, tc.batchProcessor) h(tc.c) @@ -164,12 +155,69 @@ func TestIntakeHandler(t *testing.T) { } } +func TestRateLimiting(t *testing.T) { + type test struct { + limiter *rate.Limiter + preconsumed int + expectLimited bool + } + + for name, test := range map[string]test{ + "LimiterAllowAll": { + limiter: rate.NewLimiter(rate.Limit(40), 40*5), + expectLimited: false, + }, + "LimiterPartiallyUsedLimitAllow": { + limiter: rate.NewLimiter(rate.Limit(10), 10*2), + preconsumed: 10, + expectLimited: false, + }, + "LimiterDenyAll": { + limiter: rate.NewLimiter(rate.Limit(0), 2), + expectLimited: true, + }, + "LimiterPartiallyUsedLimitDeny": { + limiter: rate.NewLimiter(rate.Limit(7), 7*2), + preconsumed: 10, + expectLimited: true, + }, + "LimiterDeny": { + limiter: rate.NewLimiter(rate.Limit(6), 6*2), + expectLimited: true, + }, + } { + t.Run(name, func(t *testing.T) { + var tc testcaseIntakeHandler + tc.path = "ratelimit.ndjson" + tc.setup(t) + tc.c.RateLimiter = test.limiter + if test.preconsumed > 0 { + test.limiter.AllowN(time.Now(), test.preconsumed) + } + + h := Handler(tc.processor, tc.batchProcessor) + h(tc.c) + + if test.expectLimited { + assert.Equal(t, request.IDResponseErrorsRateLimit, tc.c.Result.ID) + assert.Equal(t, http.StatusTooManyRequests, tc.w.Code) + assert.Error(t, tc.c.Result.Err) + } else { + assert.Equal(t, request.IDResponseValidAccepted, tc.c.Result.ID) + assert.Equal(t, http.StatusAccepted, tc.w.Code) + assert.NoError(t, tc.c.Result.Err) + } + assert.NotZero(t, tc.w.Body.Len()) + approvaltest.ApproveJSON(t, "test_approved/"+t.Name(), tc.w.Body.Bytes()) + }) + } +} + type testcaseIntakeHandler struct { c *request.Context w *httptest.ResponseRecorder r *http.Request processor *stream.Processor - rateLimit *ratelimit.Store batchProcessor model.BatchProcessor path string @@ -183,7 +231,7 @@ func (tc *testcaseIntakeHandler) setup(t *testing.T) { tc.processor = stream.BackendProcessor(cfg) } if tc.batchProcessor == nil { - tc.batchProcessor = model.ProcessBatchFunc(func(context.Context, *model.Batch) error { return nil }) + tc.batchProcessor = modelprocessor.Nop{} } if tc.r == nil { diff --git a/beater/api/intake/test_approved/TestRateLimiting/LimiterAllowAll.approved.json b/beater/api/intake/test_approved/TestRateLimiting/LimiterAllowAll.approved.json new file mode 100644 index 00000000000..c612a4faf20 --- /dev/null +++ b/beater/api/intake/test_approved/TestRateLimiting/LimiterAllowAll.approved.json @@ -0,0 +1,3 @@ +{ + "accepted": 19 +} diff --git a/beater/api/intake/test_approved/TestRateLimiting/LimiterDeny.approved.json b/beater/api/intake/test_approved/TestRateLimiting/LimiterDeny.approved.json new file mode 100644 index 00000000000..8333c5a198a --- /dev/null +++ b/beater/api/intake/test_approved/TestRateLimiting/LimiterDeny.approved.json @@ -0,0 +1,8 @@ +{ + "accepted": 10, + "errors": [ + { + "message": "rate limit exceeded" + } + ] +} diff --git a/beater/api/intake/test_approved/RateLimit.approved.json b/beater/api/intake/test_approved/TestRateLimiting/LimiterDenyAll.approved.json similarity index 100% rename from beater/api/intake/test_approved/RateLimit.approved.json rename to beater/api/intake/test_approved/TestRateLimiting/LimiterDenyAll.approved.json diff --git a/beater/api/intake/test_approved/TestRateLimiting/LimiterPartiallyUsedLimitAllow.approved.json b/beater/api/intake/test_approved/TestRateLimiting/LimiterPartiallyUsedLimitAllow.approved.json new file mode 100644 index 00000000000..c612a4faf20 --- /dev/null +++ b/beater/api/intake/test_approved/TestRateLimiting/LimiterPartiallyUsedLimitAllow.approved.json @@ -0,0 +1,3 @@ +{ + "accepted": 19 +} diff --git a/beater/api/intake/test_approved/TestRateLimiting/LimiterPartiallyUsedLimitDeny.approved.json b/beater/api/intake/test_approved/TestRateLimiting/LimiterPartiallyUsedLimitDeny.approved.json new file mode 100644 index 00000000000..8333c5a198a --- /dev/null +++ b/beater/api/intake/test_approved/TestRateLimiting/LimiterPartiallyUsedLimitDeny.approved.json @@ -0,0 +1,8 @@ +{ + "accepted": 10, + "errors": [ + { + "message": "rate limit exceeded" + } + ] +} diff --git a/beater/api/mux.go b/beater/api/mux.go index ef6e53afe69..55e9c1ee73f 100644 --- a/beater/api/mux.go +++ b/beater/api/mux.go @@ -18,6 +18,7 @@ package api import ( + "context" "net/http" "net/http/pprof" @@ -37,6 +38,7 @@ import ( "github.com/elastic/apm-server/beater/request" logs "github.com/elastic/apm-server/log" "github.com/elastic/apm-server/model" + "github.com/elastic/apm-server/model/modelprocessor" "github.com/elastic/apm-server/processor/stream" "github.com/elastic/apm-server/publish" ) @@ -100,8 +102,8 @@ func NewMux( {AssetSourcemapPath, builder.sourcemapHandler}, {AgentConfigPath, builder.backendAgentConfigHandler(fetcher)}, {AgentConfigRUMPath, builder.rumAgentConfigHandler(fetcher)}, - {IntakeRUMPath, builder.rumIntakeHandler}, - {IntakeRUMV3Path, builder.rumV3IntakeHandler}, + {IntakeRUMPath, builder.rumIntakeHandler(stream.RUMV2Processor)}, + {IntakeRUMV3Path, builder.rumIntakeHandler(stream.RUMV3Processor)}, {IntakePath, builder.backendIntakeHandler}, // The profile endpoint is in Beta {ProfilePath, builder.profileHandler}, @@ -152,14 +154,13 @@ func (r *routeBuilder) backendIntakeHandler() (request.Handler, error) { return middleware.Wrap(h, backendMiddleware(r.cfg, authHandler, intake.MonitoringMap)...) } -func (r *routeBuilder) rumIntakeHandler() (request.Handler, error) { - h := intake.Handler(stream.RUMV2Processor(r.cfg), r.batchProcessor) - return middleware.Wrap(h, rumMiddleware(r.cfg, nil, intake.MonitoringMap)...) -} - -func (r *routeBuilder) rumV3IntakeHandler() (request.Handler, error) { - h := intake.Handler(stream.RUMV3Processor(r.cfg), r.batchProcessor) - return middleware.Wrap(h, rumMiddleware(r.cfg, nil, intake.MonitoringMap)...) +func (r *routeBuilder) rumIntakeHandler(newProcessor func(*config.Config) *stream.Processor) func() (request.Handler, error) { + return func() (request.Handler, error) { + batchProcessor := r.batchProcessor + batchProcessor = batchProcessorWithAllowedServiceNames(batchProcessor, r.cfg.RumConfig.AllowServiceNames) + h := intake.Handler(newProcessor(r.cfg), batchProcessor) + return middleware.Wrap(h, rumMiddleware(r.cfg, nil, intake.MonitoringMap)...) + } } func (r *routeBuilder) sourcemapHandler() (request.Handler, error) { @@ -265,3 +266,25 @@ func rootMiddleware(cfg *config.Config, auth *authorization.Handler) []middlewar middleware.ResponseHeadersMiddleware(cfg.ResponseHeaders), middleware.AuthorizationMiddleware(auth, false)) } + +// TODO(axw) move this into the authorization package when introducing anonymous auth. +func batchProcessorWithAllowedServiceNames(p model.BatchProcessor, allowedServiceNames []string) model.BatchProcessor { + if len(allowedServiceNames) == 0 { + // All service names are allowed. + return p + } + m := make(map[string]bool) + for _, name := range allowedServiceNames { + m[name] = true + } + var restrictServiceName modelprocessor.MetadataProcessorFunc = func(ctx context.Context, meta *model.Metadata) error { + // Restrict to explicitly allowed service names. + // The list of allowed service names is not considered secret, + // so we do not use constant time comparison. + if !m[meta.Service.Name] { + return &stream.InvalidInputError{Message: "service name is not allowed"} + } + return nil + } + return modelprocessor.Chained{restrictServiceName, p} +} diff --git a/beater/api/mux_intake_rum_test.go b/beater/api/mux_intake_rum_test.go index c1d35d25548..5bf99c1ee44 100644 --- a/beater/api/mux_intake_rum_test.go +++ b/beater/api/mux_intake_rum_test.go @@ -18,6 +18,8 @@ package api import ( + "bytes" + "io/ioutil" "net/http" "net/http/httptest" "testing" @@ -121,6 +123,39 @@ func TestRumHandler_MonitoringMiddleware(t *testing.T) { } } +func TestRUMHandler_AllowedServiceNames(t *testing.T) { + payload, err := ioutil.ReadFile("../../testdata/intake-v2/transactions_spans_rum.ndjson") + require.NoError(t, err) + + for _, test := range []struct { + AllowServiceNames []string + Allowed bool + }{{ + AllowServiceNames: nil, + Allowed: true, // none specified = all allowed + }, { + AllowServiceNames: []string{"apm-agent-js"}, // matches what's in test data + Allowed: true, + }, { + AllowServiceNames: []string{"reject_everything"}, + Allowed: false, + }} { + cfg := cfgEnabledRUM() + cfg.RumConfig.AllowServiceNames = test.AllowServiceNames + h := newTestMux(t, cfg) + + req := httptest.NewRequest(http.MethodPost, "/intake/v2/rum/events", bytes.NewReader(payload)) + req.Header.Set("Content-Type", "application/x-ndjson") + w := httptest.NewRecorder() + h.ServeHTTP(w, req) + if test.Allowed { + assert.Equal(t, http.StatusAccepted, w.Code) + } else { + assert.Equal(t, http.StatusBadRequest, w.Code) + } + } +} + func cfgEnabledRUM() *config.Config { cfg := config.DefaultConfig() cfg.RumConfig.Enabled = true diff --git a/processor/stream/benchmark_test.go b/processor/stream/benchmark_test.go index ef1df774f1a..1ec5ba7ea53 100644 --- a/processor/stream/benchmark_test.go +++ b/processor/stream/benchmark_test.go @@ -21,12 +21,9 @@ import ( "bytes" "context" "io/ioutil" - "math" "path/filepath" "testing" - "golang.org/x/time/rate" - "github.com/elastic/apm-server/beater/config" "github.com/elastic/apm-server/model" ) @@ -44,36 +41,30 @@ func BenchmarkRUMV3Processor(b *testing.B) { } func benchmarkStreamProcessor(b *testing.B, processor *Processor, files []string) { + const batchSize = 10 batchProcessor := nopBatchProcessor{} + benchmark := func(b *testing.B, filename string) { + data, err := ioutil.ReadFile(filename) + if err != nil { + b.Error(err) + } + r := bytes.NewReader(data) + b.ReportAllocs() + b.SetBytes(int64(len(data))) + b.ResetTimer() + for i := 0; i < b.N; i++ { + b.StopTimer() + r.Reset(data) + b.StartTimer() - //ensure to not hit rate limit as blocking wait would be measured otherwise - rl := rate.NewLimiter(rate.Limit(math.MaxFloat64-1), math.MaxInt32) - - benchmark := func(filename string, rl *rate.Limiter) func(b *testing.B) { - return func(b *testing.B) { - data, err := ioutil.ReadFile(filename) - if err != nil { - b.Error(err) - } - r := bytes.NewReader(data) - b.ReportAllocs() - b.SetBytes(int64(len(data))) - b.ResetTimer() - for i := 0; i < b.N; i++ { - b.StopTimer() - r.Reset(data) - b.StartTimer() - - var result Result - processor.HandleStream(context.Background(), rl, &model.Metadata{}, r, batchProcessor, &result) - } + var result Result + processor.HandleStream(context.Background(), &model.Metadata{}, r, batchSize, batchProcessor, &result) } } for _, f := range files { b.Run(filepath.Base(f), func(b *testing.B) { - b.Run("NoRateLimit", benchmark(f, nil)) - b.Run("WithRateLimit", benchmark(f, rl)) + benchmark(b, f) }) } } diff --git a/processor/stream/processor.go b/processor/stream/processor.go index 0fdfb2c45ea..bb01e3ce44f 100644 --- a/processor/stream/processor.go +++ b/processor/stream/processor.go @@ -24,8 +24,6 @@ import ( "sync" "time" - "golang.org/x/time/rate" - "go.elastic.co/apm" "github.com/pkg/errors" @@ -36,19 +34,14 @@ import ( "github.com/elastic/apm-server/model/modeldecoder" "github.com/elastic/apm-server/model/modeldecoder/rumv3" v2 "github.com/elastic/apm-server/model/modeldecoder/v2" - "github.com/elastic/apm-server/model/modelprocessor" "github.com/elastic/apm-server/utility" ) var ( - ErrUnrecognizedObject = errors.New("did not recognize object type") - - ErrRateLimitExceeded = errors.New("rate limit exceeded") + errUnrecognizedObject = errors.New("did not recognize object type") ) const ( - batchSize = 10 - errorEventType = "error" metricsetEventType = "metricset" spanEventType = "span" @@ -61,12 +54,11 @@ const ( type decodeMetadataFunc func(decoder.Decoder, *model.Metadata) error type Processor struct { - Mconfig modeldecoder.Config - MaxEventSize int - streamReaderPool sync.Pool - decodeMetadata decodeMetadataFunc - isRUM bool - allowedServiceNames map[string]bool + Mconfig modeldecoder.Config + MaxEventSize int + streamReaderPool sync.Pool + decodeMetadata decodeMetadataFunc + isRUM bool } func BackendProcessor(cfg *config.Config) *Processor { @@ -80,33 +72,20 @@ func BackendProcessor(cfg *config.Config) *Processor { func RUMV2Processor(cfg *config.Config) *Processor { return &Processor{ - Mconfig: modeldecoder.Config{Experimental: cfg.Mode == config.ModeExperimental}, - MaxEventSize: cfg.MaxEventSize, - decodeMetadata: v2.DecodeNestedMetadata, - isRUM: true, - allowedServiceNames: makeAllowedServiceNamesMap(cfg.RumConfig.AllowServiceNames), + Mconfig: modeldecoder.Config{Experimental: cfg.Mode == config.ModeExperimental}, + MaxEventSize: cfg.MaxEventSize, + decodeMetadata: v2.DecodeNestedMetadata, + isRUM: true, } } func RUMV3Processor(cfg *config.Config) *Processor { return &Processor{ - Mconfig: modeldecoder.Config{Experimental: cfg.Mode == config.ModeExperimental}, - MaxEventSize: cfg.MaxEventSize, - decodeMetadata: rumv3.DecodeNestedMetadata, - isRUM: true, - allowedServiceNames: makeAllowedServiceNamesMap(cfg.RumConfig.AllowServiceNames), - } -} - -func makeAllowedServiceNamesMap(allowed []string) map[string]bool { - if len(allowed) == 0 { - return nil - } - m := make(map[string]bool, len(allowed)) - for _, name := range allowed { - m[name] = true + Mconfig: modeldecoder.Config{Experimental: cfg.Mode == config.ModeExperimental}, + MaxEventSize: cfg.MaxEventSize, + decodeMetadata: rumv3.DecodeNestedMetadata, + isRUM: true, } - return m } func (p *Processor) readMetadata(reader *streamReader, metadata *model.Metadata) error { @@ -129,17 +108,13 @@ func (p *Processor) readMetadata(reader *streamReader, metadata *model.Metadata) return nil } -// IdentifyEventType takes a reader and reads ahead the first key of the +// identifyEventType takes a reader and reads ahead the first key of the // underlying json input. This method makes some assumptions met by the // input format: // - the input is in JSON format // - every valid ndjson line only has one root key // - the bytes that we must match on are ASCII -// -// NOTE(axw) this method really should not be exported, but it has to be -// for package_test. When we migrate that code to system tests, unexport -// this method. -func (p *Processor) IdentifyEventType(body []byte) []byte { +func (p *Processor) identifyEventType(body []byte) []byte { // find event type, trim spaces and account for single and double quotes var quote byte var key []byte @@ -163,7 +138,6 @@ func (p *Processor) IdentifyEventType(body []byte) []byte { // the error err. func (p *Processor) readBatch( ctx context.Context, - ipRateLimiter *rate.Limiter, requestTime time.Time, streamMetadata *model.Metadata, batchSize int, @@ -172,19 +146,6 @@ func (p *Processor) readBatch( result *Result, ) (int, error) { - if ipRateLimiter != nil { - // TODO(axw) move this logic somewhere central, so it - // can be used by processor/otel too. - // - // use provided rate limiter to throttle batch read - ctxT, cancel := context.WithTimeout(ctx, time.Second) - err := ipRateLimiter.WaitN(ctxT, batchSize) - cancel() - if err != nil { - return 0, ErrRateLimitExceeded - } - } - // input events are decoded and appended to the batch var n int for i := 0; i < batchSize && !reader.IsEOF(); i++ { @@ -208,7 +169,7 @@ func (p *Processor) readBatch( Metadata: *streamMetadata, Config: p.Mconfig, } - switch eventType := p.IdentifyEventType(body); string(eventType) { + switch eventType := p.identifyEventType(body); string(eventType) { case errorEventType: var event model.Error err := v2.DecodeNestedError(reader, &input, &event) @@ -275,7 +236,7 @@ func (p *Processor) readBatch( n += 1 + len(event.Metricsets) + len(event.Spans) default: result.LimitedAdd(&InvalidInputError{ - Message: errors.Wrap(ErrUnrecognizedObject, string(eventType)).Error(), + Message: errors.Wrap(errUnrecognizedObject, string(eventType)).Error(), Document: string(reader.LatestLine()), }) continue @@ -298,8 +259,8 @@ func handleDecodeErr(err error, r *streamReader, result *Result) bool { return true } -// HandleStream processes a stream of events, updating result as events are -// accepted, or per-event errors occur. +// HandleStream processes a stream of events in batches of batchSize at a time, +// updating result as events are accepted, or per-event errors occur. // // HandleStream will return an error when a terminal stream-level error occurs, // such as the rate limit being exceeded, or due to authorization errors. In @@ -308,9 +269,9 @@ func handleDecodeErr(err error, r *streamReader, result *Result) bool { // Callers must not access result concurrently with HandleStream. func (p *Processor) HandleStream( ctx context.Context, - ipRateLimiter *rate.Limiter, meta *model.Metadata, reader io.Reader, + batchSize int, processor model.BatchProcessor, result *Result, ) error { @@ -323,10 +284,6 @@ func (p *Processor) HandleStream( return err } - var allowedServiceNamesProcessor model.BatchProcessor = modelprocessor.Nop{} - if p.allowedServiceNames != nil { - allowedServiceNamesProcessor = modelprocessor.MetadataProcessorFunc(p.restrictAllowedServiceNames) - } requestTime := utility.RequestTime(ctx) sp, ctx := apm.StartSpan(ctx, "Stream", "Reporter") @@ -334,11 +291,8 @@ func (p *Processor) HandleStream( for { var batch model.Batch - n, readErr := p.readBatch(ctx, ipRateLimiter, requestTime, meta, batchSize, &batch, sr, result) + n, readErr := p.readBatch(ctx, requestTime, meta, batchSize, &batch, sr, result) if n > 0 { - if err := allowedServiceNamesProcessor.ProcessBatch(ctx, &batch); err != nil { - return err - } // NOTE(axw) ProcessBatch takes ownership of batch, which means we cannot reuse // the slice memory. We should investigate alternative interfaces between the // processor and publisher which would enable better memory reuse, e.g. by using @@ -358,18 +312,6 @@ func (p *Processor) HandleStream( return nil } -func (p *Processor) restrictAllowedServiceNames(ctx context.Context, meta *model.Metadata) error { - // Restrict to explicitly allowed service names. The list of - // allowed service names is not considered secret, so we do - // not use constant time comparison. - if !p.allowedServiceNames[meta.Service.Name] { - return &InvalidInputError{ - Message: "service name is not allowed", - } - } - return nil -} - // getStreamReader returns a streamReader that reads ND-JSON lines from r. func (p *Processor) getStreamReader(r io.Reader) *streamReader { if sr, ok := p.streamReaderPool.Get().(*streamReader); ok { diff --git a/processor/stream/processor_test.go b/processor/stream/processor_test.go index cd2a93ead6c..8eb5b109419 100644 --- a/processor/stream/processor_test.go +++ b/processor/stream/processor_test.go @@ -30,13 +30,11 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "golang.org/x/time/rate" "github.com/elastic/apm-server/approvaltest" "github.com/elastic/apm-server/beater/beatertest" "github.com/elastic/apm-server/beater/config" "github.com/elastic/apm-server/model" - "github.com/elastic/apm-server/model/modelprocessor" "github.com/elastic/apm-server/publish" "github.com/elastic/apm-server/transform" "github.com/elastic/apm-server/utility" @@ -57,7 +55,7 @@ func TestHandlerReadStreamError(t *testing.T) { sp := BackendProcessor(&config.Config{MaxEventSize: 100 * 1024}) var actualResult Result - err = sp.HandleStream(context.Background(), nil, &model.Metadata{}, timeoutReader, processor, &actualResult) + err = sp.HandleStream(context.Background(), &model.Metadata{}, timeoutReader, 10, processor, &actualResult) assert.EqualError(t, err, "timeout") assert.Equal(t, Result{Accepted: accepted}, actualResult) } @@ -83,8 +81,8 @@ func TestHandlerReportingStreamError(t *testing.T) { var actualResult Result err := sp.HandleStream( - context.Background(), nil, &model.Metadata{}, - bytes.NewReader(payload), processor, &actualResult, + context.Background(), &model.Metadata{}, + bytes.NewReader(payload), 10, processor, &actualResult, ) assert.Equal(t, test.err, err) assert.Zero(t, actualResult) @@ -184,7 +182,7 @@ func TestIntegrationESOutput(t *testing.T) { p := BackendProcessor(&config.Config{MaxEventSize: 100 * 1024}) var actualResult Result - err = p.HandleStream(ctx, nil, reqDecoderMeta, bytes.NewReader(payload), batchProcessor, &actualResult) + err = p.HandleStream(ctx, reqDecoderMeta, bytes.NewReader(payload), 10, batchProcessor, &actualResult) if test.err != nil { assert.Equal(t, test.err, err) } else { @@ -219,7 +217,7 @@ func TestIntegrationRum(t *testing.T) { p := RUMV2Processor(&config.Config{MaxEventSize: 100 * 1024}) var actualResult Result - err = p.HandleStream(ctx, nil, &reqDecoderMeta, bytes.NewReader(payload), batchProcessor, &actualResult) + err = p.HandleStream(ctx, &reqDecoderMeta, bytes.NewReader(payload), 10, batchProcessor, &actualResult) require.NoError(t, err) assert.Equal(t, Result{Accepted: accepted}, actualResult) }) @@ -250,106 +248,13 @@ func TestRUMV3(t *testing.T) { p := RUMV3Processor(&config.Config{MaxEventSize: 100 * 1024}) var actualResult Result - err = p.HandleStream(ctx, nil, &reqDecoderMeta, bytes.NewReader(payload), batchProcessor, &actualResult) + err = p.HandleStream(ctx, &reqDecoderMeta, bytes.NewReader(payload), 10, batchProcessor, &actualResult) require.NoError(t, err) assert.Equal(t, Result{Accepted: accepted}, actualResult) }) } } -func TestRUMAllowedServiceNames(t *testing.T) { - payload, err := ioutil.ReadFile("../../testdata/intake-v2/transactions_spans_rum.ndjson") - require.NoError(t, err) - - for _, test := range []struct { - AllowServiceNames []string - Allowed bool - }{{ - AllowServiceNames: nil, - Allowed: true, // none specified = all allowed - }, { - AllowServiceNames: []string{"apm-agent-js"}, // matches what's in test data - Allowed: true, - }, { - AllowServiceNames: []string{"reject_everything"}, - Allowed: false, - }} { - p := RUMV2Processor(&config.Config{ - MaxEventSize: 100 * 1024, - RumConfig: config.RumConfig{AllowServiceNames: test.AllowServiceNames}, - }) - - var result Result - err := p.HandleStream( - context.Background(), nil, &model.Metadata{}, bytes.NewReader(payload), - modelprocessor.Nop{}, &result, - ) - if test.Allowed { - require.NoError(t, err) - assert.Equal(t, Result{Accepted: 2}, result) - } else { - assert.EqualError(t, err, "service name is not allowed") - assert.Equal(t, Result{Accepted: 0}, result) - } - } -} - -func TestRateLimiting(t *testing.T) { - payload, err := ioutil.ReadFile("../../testdata/intake-v2/ratelimit.ndjson") - require.NoError(t, err) - - for _, test := range []struct { - name string - lim *rate.Limiter - hit int - accepted int - limited bool - }{{ - name: "LimiterDenyAll", - lim: rate.NewLimiter(rate.Limit(0), 2), - accepted: 0, - limited: true, - }, { - name: "LimiterAllowAll", - lim: rate.NewLimiter(rate.Limit(40), 40*5), - accepted: 19, - }, { - name: "LimiterPartiallyUsedLimitAllow", - lim: rate.NewLimiter(rate.Limit(10), 10*2), - hit: 10, - accepted: 19, - }, { - name: "LimiterPartiallyUsedLimitDeny", - lim: rate.NewLimiter(rate.Limit(7), 7*2), - hit: 10, - accepted: 10, - limited: true, - }, { - name: "LimiterDeny", - lim: rate.NewLimiter(rate.Limit(6), 6*2), - accepted: 10, - limited: true, - }} { - t.Run(test.name, func(t *testing.T) { - if test.hit > 0 { - assert.True(t, test.lim.AllowN(time.Now(), test.hit)) - } - - var actualResult Result - err := BackendProcessor(&config.Config{MaxEventSize: 100 * 1024}).HandleStream( - context.Background(), test.lim, &model.Metadata{}, bytes.NewReader(payload), nopBatchProcessor{}, - &actualResult, - ) - if test.limited { - assert.Equal(t, ErrRateLimitExceeded, err) - } else { - assert.NoError(t, err) - } - assert.Equal(t, Result{Accepted: test.accepted}, actualResult) - }) - } -} - func makeApproveEventsBatchProcessor(t *testing.T, name string, count *int) model.BatchProcessor { return model.ProcessBatchFunc(func(ctx context.Context, b *model.Batch) error { events := b.Transform(ctx, &transform.Config{DataStreams: true}) From ab082c3f7e8d53f25cba6466797f596fe2af6d34 Mon Sep 17 00:00:00 2001 From: Andrew Wilkins Date: Mon, 21 Jun 2021 14:58:20 +0800 Subject: [PATCH 2/5] docker-compose: fleet-server depends on kibana (#5496) --- docker-compose.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/docker-compose.yml b/docker-compose.yml index 23a6e341422..5cb435d8ad1 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -106,6 +106,7 @@ services: KIBANA_PASSWORD: "${ES_SUPERUSER_PASS:-changeme}" depends_on: elasticsearch: { condition: service_healthy } + kibana: { condition: service_healthy } volumes: - "./testing/docker/fleet-server/certificate.pem:/etc/pki/tls/certs/fleet-server.pem" - "./testing/docker/fleet-server/key.pem:/etc/pki/tls/private/fleet-server-key.pem" From d577ec8085a492f9cf4d0d08f1d5c830fde9b107 Mon Sep 17 00:00:00 2001 From: Andrew Wilkins Date: Mon, 21 Jun 2021 21:29:27 +0800 Subject: [PATCH 3/5] beater: even more refactoring (#5502) * beater: even more refactoring - rate limiting middleware is now installed for both RUM and backend agent APIs, but only applies for anonymous clients (currently only RUM) - rate limiting middleware now performs an initial Allow check at the request level, for consistent request rate limiting of those endpoints that are rate limited - agent config now restricts "insecure" (RUM) agents on the basis that they are anonymous, rather than being RUM specifically. The list of insecure agent names (those allowed for anonymous auth) is now passed in * make gofmt * beater/api/profile: remove unused field --- beater/api/config/agent/handler.go | 63 +++++------ beater/api/config/agent/handler_test.go | 89 +++++++-------- beater/api/intake/handler.go | 33 ++---- beater/api/intake/handler_test.go | 14 ++- beater/api/mux.go | 86 +++++++++----- beater/api/mux_intake_rum_test.go | 4 +- beater/api/mux_test.go | 7 +- beater/api/profile/handler.go | 20 ++-- beater/api/profile/handler_test.go | 17 +-- .../ratelimit/context.go} | 30 +++-- beater/api/ratelimit/store.go | 17 +-- beater/api/ratelimit/store_test.go | 29 +---- beater/http.go | 14 ++- beater/middleware/authorization_middleware.go | 1 + beater/middleware/rate_limit_middleware.go | 31 +++-- .../middleware/rate_limit_middleware_test.go | 107 ++++++++++++++++++ .../middleware/request_metadata_middleware.go | 46 -------- .../request_metadata_middleware_test.go | 77 ------------- beater/request/context.go | 36 ++---- beater/request/context_test.go | 11 +- beater/server.go | 28 +++-- beater/tracing.go | 14 ++- systemtest/rum_test.go | 7 +- 23 files changed, 403 insertions(+), 378 deletions(-) rename beater/{middleware/rum_middleware.go => api/ratelimit/context.go} (52%) create mode 100644 beater/middleware/rate_limit_middleware_test.go delete mode 100644 beater/middleware/request_metadata_middleware.go delete mode 100644 beater/middleware/request_metadata_middleware_test.go diff --git a/beater/api/config/agent/handler.go b/beater/api/config/agent/handler.go index 04086dcc9d2..f77a39f24fa 100644 --- a/beater/api/config/agent/handler.go +++ b/beater/api/config/agent/handler.go @@ -52,18 +52,21 @@ var ( registry = monitoring.Default.NewRegistry("apm-server.acm") errCacheControl = fmt.Sprintf("max-age=%v, must-revalidate", errMaxAgeDuration.Seconds()) - - // rumAgents keywords (new and old) - rumAgents = []string{"rum-js", "js-base"} ) type handler struct { f agentcfg.Fetcher + allowAnonymousAgents []string cacheControl, defaultServiceEnvironment string } -func NewHandler(f agentcfg.Fetcher, config config.KibanaAgentConfig, defaultServiceEnvironment string) request.Handler { +func NewHandler( + f agentcfg.Fetcher, + config config.KibanaAgentConfig, + defaultServiceEnvironment string, + allowAnonymousAgents []string, +) request.Handler { if f == nil { panic("fetcher must not be nil") } @@ -72,6 +75,7 @@ func NewHandler(f agentcfg.Fetcher, config config.KibanaAgentConfig, defaultServ f: f, cacheControl: cacheControl, defaultServiceEnvironment: defaultServiceEnvironment, + allowAnonymousAgents: allowAnonymousAgents, } return h.Handle @@ -83,13 +87,6 @@ func (h *handler) Handle(c *request.Context) { // error handling c.Header().Set(headers.CacheControl, errCacheControl) - ok := c.RateLimiter == nil || c.RateLimiter.Allow() - if !ok { - c.Result.SetDefault(request.IDResponseErrorsRateLimit) - c.Write() - return - } - query, queryErr := buildQuery(c) if queryErr != nil { extractQueryError(c, queryErr) @@ -100,26 +97,29 @@ func (h *handler) Handle(c *request.Context) { query.Service.Environment = h.defaultServiceEnvironment } - if !c.AuthResult.Anonymous { - // The exact agent is not always known for anonymous clients, so we do not - // issue a secondary authorization check for them. Instead, we issue the - // request and filter the results using query.InsecureAgents. - authResource := authorization.Resource{ServiceName: query.Service.Name} - if result, err := authorization.AuthorizedFor(c.Request.Context(), authResource); err != nil { - c.Result.SetDefault(request.IDResponseErrorsServiceUnavailable) - c.Result.Err = err - c.Write() - return - } else if !result.Authorized { - id := request.IDResponseErrorsUnauthorized - status := request.MapResultIDToStatus[id] - if result.Reason != "" { - status.Keyword = result.Reason - } - c.Result.Set(id, status.Code, status.Keyword, nil, nil) - c.Write() - return + // Only service, and not agent, is known for config queries. + // For anonymous/untrusted agents, we filter the results using + // query.InsecureAgents below. + authResource := authorization.Resource{ServiceName: query.Service.Name} + authResult, err := authorization.AuthorizedFor(c.Request.Context(), authResource) + if err != nil { + c.Result.SetDefault(request.IDResponseErrorsServiceUnavailable) + c.Result.Err = err + c.Write() + return + } + if !authResult.Authorized { + id := request.IDResponseErrorsUnauthorized + status := request.MapResultIDToStatus[id] + if authResult.Reason != "" { + status.Keyword = authResult.Reason } + c.Result.Set(id, status.Code, status.Keyword, nil, nil) + c.Write() + return + } + if authResult.Anonymous { + query.InsecureAgents = h.allowAnonymousAgents } result, err := h.f.Fetch(c.Request.Context(), query) @@ -184,9 +184,6 @@ func buildQuery(c *request.Context) (agentcfg.Query, error) { return query, errors.New(agentcfg.ServiceName + " is required") } - if c.IsRum { - query.InsecureAgents = rumAgents - } query.Etag = ifNoneMatch(c) return query, nil } diff --git a/beater/api/config/agent/handler_test.go b/beater/api/config/agent/handler_test.go index 7dcbc1dc079..7ab065f84d3 100644 --- a/beater/api/config/agent/handler_test.go +++ b/beater/api/config/agent/handler_test.go @@ -32,7 +32,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.elastic.co/apm/apmtest" - "golang.org/x/time/rate" "github.com/elastic/beats/v7/libbeat/common" libkibana "github.com/elastic/beats/v7/libbeat/kibana" @@ -173,18 +172,12 @@ func TestAgentConfigHandler(t *testing.T) { var cfg = config.KibanaAgentConfig{Cache: config.Cache{Expiration: 4 * time.Second}} for _, tc := range testcases { f := agentcfg.NewKibanaFetcher(tc.kbClient, cfg.Cache.Expiration) - h := NewHandler(f, cfg, "") + h := NewHandler(f, cfg, "", nil) r := httptest.NewRequest(tc.method, target(tc.queryParams), nil) for k, v := range tc.requestHeader { r.Header.Set(k, v) } ctx, w := newRequestContext(r) - ctx.AuthResult.Authorized = true - ctx.Request = withAuthorization(ctx.Request, - authorizedForFunc(func(context.Context, authorization.Resource) (authorization.Result, error) { - return authorization.Result{Authorized: true}, nil - }), - ) h(ctx) require.Equal(t, tc.respStatus, w.Code) @@ -202,32 +195,46 @@ func TestAgentConfigHandlerAnonymousAccess(t *testing.T) { kbClient := kibanatest.MockKibana(http.StatusUnauthorized, m{"error": "Unauthorized"}, mockVersion, true) cfg := config.KibanaAgentConfig{Cache: config.Cache{Expiration: time.Nanosecond}} f := agentcfg.NewKibanaFetcher(kbClient, cfg.Cache.Expiration) - h := NewHandler(f, cfg, "") + h := NewHandler(f, cfg, "", nil) for _, tc := range []struct { - anonymous bool - response string + anonymous bool + response string + authResource *authorization.Resource }{{ - anonymous: false, - response: `{"error":"APM Server is not authorized to query Kibana. Please configure apm-server.kibana.username and apm-server.kibana.password, and ensure the user has the necessary privileges."}`, + anonymous: false, + response: `{"error":"APM Server is not authorized to query Kibana. Please configure apm-server.kibana.username and apm-server.kibana.password, and ensure the user has the necessary privileges."}`, + authResource: &authorization.Resource{ServiceName: "opbeans"}, }, { - anonymous: true, - response: `{"error":"Unauthorized"}`, + anonymous: true, + response: `{"error":"Unauthorized"}`, + authResource: &authorization.Resource{ServiceName: "opbeans"}, }} { r := httptest.NewRequest(http.MethodGet, target(map[string]string{"service.name": "opbeans"}), nil) - ctx, w := newRequestContext(r) - ctx.AuthResult.Authorized = true - ctx.AuthResult.Anonymous = tc.anonymous - ctx.Request = withAuthorization(ctx.Request, authorization.AnonymousAuth{}) - h(ctx) + c, w := newRequestContext(r) + c.AuthResult.Authorized = true + c.AuthResult.Anonymous = tc.anonymous + + var requestedResource *authorization.Resource + c.Request = withAuthorization(c.Request, + authorizedForFunc(func(ctx context.Context, resource authorization.Resource) (authorization.Result, error) { + if requestedResource != nil { + panic("expected only one AuthorizedFor request") + } + requestedResource = &resource + return c.AuthResult, nil + }), + ) + h(c) assert.Equal(t, tc.response+"\n", w.Body.String()) + assert.Equal(t, tc.authResource, requestedResource) } } func TestAgentConfigHandlerAuthorizedForService(t *testing.T) { cfg := config.KibanaAgentConfig{Cache: config.Cache{Expiration: time.Nanosecond}} f := agentcfg.NewKibanaFetcher(nil, cfg.Cache.Expiration) - h := NewHandler(f, cfg, "") + h := NewHandler(f, cfg, "", nil) r := httptest.NewRequest(http.MethodGet, target(map[string]string{"service.name": "opbeans"}), nil) ctx, w := newRequestContext(r) @@ -249,7 +256,7 @@ func TestAgentConfigHandlerAuthorizedForService(t *testing.T) { func TestAgentConfigHandler_NoKibanaClient(t *testing.T) { cfg := config.KibanaAgentConfig{Cache: config.Cache{Expiration: time.Nanosecond}} f := agentcfg.NewKibanaFetcher(nil, cfg.Cache.Expiration) - h := NewHandler(f, cfg, "") + h := NewHandler(f, cfg, "", nil) w := sendRequest(h, httptest.NewRequest(http.MethodPost, "/config", convert.ToReader(m{ "service": m{"name": "opbeans-node"}}))) @@ -268,7 +275,7 @@ func TestAgentConfigHandler_PostOk(t *testing.T) { var cfg = config.KibanaAgentConfig{Cache: config.Cache{Expiration: time.Nanosecond}} f := agentcfg.NewKibanaFetcher(kb, cfg.Cache.Expiration) - h := NewHandler(f, cfg, "") + h := NewHandler(f, cfg, "", nil) w := sendRequest(h, httptest.NewRequest(http.MethodPost, "/config", convert.ToReader(m{ "service": m{"name": "opbeans-node"}}))) @@ -289,7 +296,7 @@ func TestAgentConfigHandler_DefaultServiceEnvironment(t *testing.T) { var cfg = config.KibanaAgentConfig{Cache: config.Cache{Expiration: time.Nanosecond}} f := agentcfg.NewKibanaFetcher(kb, cfg.Cache.Expiration) - h := NewHandler(f, cfg, "default") + h := NewHandler(f, cfg, "default", nil) sendRequest(h, httptest.NewRequest(http.MethodPost, "/config", convert.ToReader(m{"service": m{"name": "opbeans-node", "environment": "specified"}}))) sendRequest(h, httptest.NewRequest(http.MethodPost, "/config", convert.ToReader(m{"service": m{"name": "opbeans-node"}}))) @@ -306,8 +313,6 @@ func TestAgentConfigRum(t *testing.T) { r := httptest.NewRequest(http.MethodPost, "/rum", convert.ToReader(m{ "service": m{"name": "opbeans"}})) ctx, w := newRequestContext(r) - ctx.IsRum = true - ctx.AuthResult.Anonymous = true h(ctx) var actual map[string]string json.Unmarshal(w.Body.Bytes(), &actual) @@ -320,8 +325,6 @@ func TestAgentConfigRumEtag(t *testing.T) { h := getHandler("rum-js") r := httptest.NewRequest(http.MethodGet, "/rum?ifnonematch=123&service.name=opbeans", nil) ctx, w := newRequestContext(r) - ctx.IsRum = true - ctx.AuthResult.Anonymous = true h(ctx) assert.Equal(t, http.StatusNotModified, w.Code, w.Body.String()) } @@ -333,7 +336,7 @@ func TestAgentConfigNotRum(t *testing.T) { ctx, w := newRequestContext(r) ctx.Request = withAuthorization(ctx.Request, authorizedForFunc(func(context.Context, authorization.Resource) (authorization.Result, error) { - return authorization.Result{Authorized: true}, nil + return authorization.Result{Authorized: true, Anonymous: false}, nil }), ) h(ctx) @@ -348,8 +351,6 @@ func TestAgentConfigNoLeak(t *testing.T) { r := httptest.NewRequest(http.MethodPost, "/rum", convert.ToReader(m{ "service": m{"name": "opbeans"}})) ctx, w := newRequestContext(r) - ctx.IsRum = true - ctx.AuthResult.Anonymous = true h(ctx) var actual map[string]string json.Unmarshal(w.Body.Bytes(), &actual) @@ -357,21 +358,6 @@ func TestAgentConfigNoLeak(t *testing.T) { assert.Equal(t, map[string]string{}, actual) } -func TestAgentConfigRateLimit(t *testing.T) { - h := getHandler("rum-js") - r := httptest.NewRequest(http.MethodPost, "/rum", convert.ToReader(m{ - "service": m{"name": "opbeans"}})) - ctx, w := newRequestContext(r) - ctx.IsRum = true - ctx.RateLimiter = rate.NewLimiter(rate.Limit(0), 0) - ctx.AuthResult.Anonymous = true - h(ctx) - var actual map[string]string - json.Unmarshal(w.Body.Bytes(), &actual) - assert.Equal(t, http.StatusTooManyRequests, w.Code, w.Body.String()) - assert.Equal(t, map[string]string{"error": "too many requests"}, actual) -} - func getHandler(agent string) request.Handler { kb := kibanatest.MockKibana(http.StatusOK, m{ "_id": "1", @@ -386,7 +372,7 @@ func getHandler(agent string) request.Handler { }, mockVersion, true) cfg := config.KibanaAgentConfig{Cache: config.Cache{Expiration: time.Nanosecond}} f := agentcfg.NewKibanaFetcher(kb, cfg.Cache.Expiration) - return NewHandler(f, cfg, "") + return NewHandler(f, cfg, "", []string{"rum-js"}) } func TestIfNoneMatch(t *testing.T) { @@ -412,7 +398,7 @@ func TestAgentConfigTraceContext(t *testing.T) { client := kibana.NewConnectingClient(&kibanaCfg) cfg := config.KibanaAgentConfig{Cache: config.Cache{Expiration: 5 * time.Minute}} f := agentcfg.NewKibanaFetcher(client, cfg.Cache.Expiration) - handler := NewHandler(f, cfg, "default") + handler := NewHandler(f, cfg, "default", nil) _, spans, _ := apmtest.WithTransaction(func(ctx context.Context) { // When the handler is called with a context containing // a transaction, the underlying Kibana query should create a span @@ -439,6 +425,7 @@ func newRequestContext(r *http.Request) (*request.Context, *httptest.ResponseRec w := httptest.NewRecorder() ctx := request.NewContext() ctx.Reset(w, r) + ctx.Request = withAnonymousAuthorization(ctx.Request) return ctx, w } @@ -471,6 +458,12 @@ func (c *recordingKibanaClient) Send(ctx context.Context, method string, path st return c.Client.Send(ctx, method, path, params, header, body) } +func withAnonymousAuthorization(req *http.Request) *http.Request { + return withAuthorization(req, authorizedForFunc(func(context.Context, authorization.Resource) (authorization.Result, error) { + return authorization.Result{Authorized: true, Anonymous: true}, nil + })) +} + func withAuthorization(req *http.Request, auth authorization.Authorization) *http.Request { return req.WithContext(authorization.ContextWithAuthorization(req.Context(), auth)) } diff --git a/beater/api/intake/handler.go b/beater/api/intake/handler.go index 2bfb47b7037..3d9a4d54937 100644 --- a/beater/api/intake/handler.go +++ b/beater/api/intake/handler.go @@ -30,6 +30,7 @@ import ( "github.com/elastic/beats/v7/libbeat/monitoring" + "github.com/elastic/apm-server/beater/api/ratelimit" "github.com/elastic/apm-server/beater/headers" "github.com/elastic/apm-server/beater/request" "github.com/elastic/apm-server/decoder" @@ -52,7 +53,6 @@ var ( errMethodNotAllowed = errors.New("only POST requests are supported") errServerShuttingDown = errors.New("server is shutting down") errInvalidContentType = errors.New("invalid content type") - errRateLimitExceeded = errors.New("rate limit exceeded") ) // StreamHandler is an interface for handling an Elastic APM agent ND-JSON event @@ -68,27 +68,23 @@ type StreamHandler interface { ) error } +// RequestMetadataFunc is a function type supplied to Handler for extracting +// metadata from the request. This is used for conditionally injecting the +// source IP address as `client.ip` for RUM. +type RequestMetadataFunc func(*request.Context) model.Metadata + // Handler returns a request.Handler for managing intake requests for backend and rum events. -func Handler(handler StreamHandler, batchProcessor model.BatchProcessor) request.Handler { +func Handler(handler StreamHandler, requestMetadataFunc RequestMetadataFunc, batchProcessor model.BatchProcessor) request.Handler { return func(c *request.Context) { if err := validateRequest(c); err != nil { writeError(c, err) return } - if c.RateLimiter != nil { - // Call Allow once for each stream to avoid burning CPU before we - // begin reading for the first time. This prevents clients from - // repeatedly connecting and sending < batchSize events and - // disconnecting before being rate limited. - if !c.RateLimiter.Allow() { - writeError(c, errRateLimitExceeded) - return - } - + if limiter, ok := ratelimit.FromContext(c.Request.Context()); ok { // Apply rate limiting after reading but before processing any events. batchProcessor = modelprocessor.Chained{ - rateLimitBatchProcessor(c.RateLimiter, batchSize), + rateLimitBatchProcessor(limiter, batchSize), batchProcessor, } } @@ -99,12 +95,7 @@ func Handler(handler StreamHandler, batchProcessor model.BatchProcessor) request return } - metadata := model.Metadata{ - UserAgent: model.UserAgent{Original: c.RequestMetadata.UserAgent}, - Client: model.Client{IP: c.RequestMetadata.ClientIP}, - System: model.System{IP: c.RequestMetadata.SystemIP}, - } - + metadata := requestMetadataFunc(c) var result stream.Result if err := handler.HandleStream( c.Request.Context(), @@ -132,7 +123,7 @@ func rateLimitBatch(ctx context.Context, limiter *rate.Limiter, batchSize int) e ctx, cancel := context.WithTimeout(ctx, rateLimitTimeout) defer cancel() if err := limiter.WaitN(ctx, batchSize); err != nil { - return errRateLimitExceeded + return ratelimit.ErrRateLimitExceeded } return nil } @@ -191,7 +182,7 @@ func writeStreamResult(c *request.Context, sr *stream.Result) { errID = request.IDResponseErrorsMethodNotAllowed case errors.Is(err, errInvalidContentType): errID = request.IDResponseErrorsValidate - case errors.Is(err, errRateLimitExceeded): + case errors.Is(err, ratelimit.ErrRateLimitExceeded): errID = request.IDResponseErrorsRateLimit } } diff --git a/beater/api/intake/handler_test.go b/beater/api/intake/handler_test.go index 5a4b7cf1b4d..ed4074520f4 100644 --- a/beater/api/intake/handler_test.go +++ b/beater/api/intake/handler_test.go @@ -34,6 +34,7 @@ import ( "golang.org/x/time/rate" "github.com/elastic/apm-server/approvaltest" + "github.com/elastic/apm-server/beater/api/ratelimit" "github.com/elastic/apm-server/beater/config" "github.com/elastic/apm-server/beater/headers" "github.com/elastic/apm-server/beater/request" @@ -136,7 +137,7 @@ func TestIntakeHandler(t *testing.T) { tc.setup(t) // call handler - h := Handler(tc.processor, tc.batchProcessor) + h := Handler(tc.processor, emptyRequestMetadata, tc.batchProcessor) h(tc.c) require.Equal(t, string(tc.id), string(tc.c.Result.ID)) @@ -190,12 +191,15 @@ func TestRateLimiting(t *testing.T) { var tc testcaseIntakeHandler tc.path = "ratelimit.ndjson" tc.setup(t) - tc.c.RateLimiter = test.limiter + + tc.c.Request = tc.c.Request.WithContext( + ratelimit.ContextWithLimiter(tc.c.Request.Context(), test.limiter), + ) if test.preconsumed > 0 { test.limiter.AllowN(time.Now(), test.preconsumed) } - h := Handler(tc.processor, tc.batchProcessor) + h := Handler(tc.processor, emptyRequestMetadata, tc.batchProcessor) h(tc.c) if test.expectLimited { @@ -275,3 +279,7 @@ func compressedRequest(t *testing.T, compressionType string, compressPayload boo req.Header.Set(headers.ContentEncoding, compressionType) return req } + +func emptyRequestMetadata(*request.Context) model.Metadata { + return model.Metadata{} +} diff --git a/beater/api/mux.go b/beater/api/mux.go index 55e9c1ee73f..436a96d515c 100644 --- a/beater/api/mux.go +++ b/beater/api/mux.go @@ -31,6 +31,7 @@ import ( "github.com/elastic/apm-server/beater/api/config/agent" "github.com/elastic/apm-server/beater/api/intake" "github.com/elastic/apm-server/beater/api/profile" + "github.com/elastic/apm-server/beater/api/ratelimit" "github.com/elastic/apm-server/beater/api/root" "github.com/elastic/apm-server/beater/authorization" "github.com/elastic/apm-server/beater/config" @@ -68,6 +69,13 @@ const ( IntakeRUMV3Path = "/intake/v3/rum/events" ) +var ( + // rumAgents holds the current and previous agent names for the + // RUM JavaScript agent. This is used for restricting which config + // is supplied to anonymous agents. + rumAgents = []string{"rum-js", "js-base"} +) + // NewMux registers apm handlers to paths building up the APM Server API. func NewMux( beatInfo beat.Info, @@ -75,6 +83,7 @@ func NewMux( report publish.Reporter, batchProcessor model.BatchProcessor, fetcher agentcfg.Fetcher, + ratelimitStore *ratelimit.Store, ) (*http.ServeMux, error) { pool := request.NewContextPool() mux := http.NewServeMux() @@ -91,6 +100,7 @@ func NewMux( authBuilder: auth, reporter: report, batchProcessor: batchProcessor, + ratelimitStore: ratelimitStore, } type route struct { @@ -140,33 +150,46 @@ type routeBuilder struct { authBuilder *authorization.Builder reporter publish.Reporter batchProcessor model.BatchProcessor + ratelimitStore *ratelimit.Store } func (r *routeBuilder) profileHandler() (request.Handler, error) { - h := profile.Handler(r.batchProcessor) + requestMetadataFunc := emptyRequestMetadata + if r.cfg.AugmentEnabled { + requestMetadataFunc = backendRequestMetadata + } + h := profile.Handler(requestMetadataFunc, r.batchProcessor) authHandler := r.authBuilder.ForPrivilege(authorization.PrivilegeEventWrite.Action) - return middleware.Wrap(h, backendMiddleware(r.cfg, authHandler, profile.MonitoringMap)...) + return middleware.Wrap(h, backendMiddleware(r.cfg, authHandler, r.ratelimitStore, profile.MonitoringMap)...) } func (r *routeBuilder) backendIntakeHandler() (request.Handler, error) { - h := intake.Handler(stream.BackendProcessor(r.cfg), r.batchProcessor) + requestMetadataFunc := emptyRequestMetadata + if r.cfg.AugmentEnabled { + requestMetadataFunc = backendRequestMetadata + } + h := intake.Handler(stream.BackendProcessor(r.cfg), requestMetadataFunc, r.batchProcessor) authHandler := r.authBuilder.ForPrivilege(authorization.PrivilegeEventWrite.Action) - return middleware.Wrap(h, backendMiddleware(r.cfg, authHandler, intake.MonitoringMap)...) + return middleware.Wrap(h, backendMiddleware(r.cfg, authHandler, r.ratelimitStore, intake.MonitoringMap)...) } func (r *routeBuilder) rumIntakeHandler(newProcessor func(*config.Config) *stream.Processor) func() (request.Handler, error) { + requestMetadataFunc := emptyRequestMetadata + if r.cfg.AugmentEnabled { + requestMetadataFunc = rumRequestMetadata + } return func() (request.Handler, error) { batchProcessor := r.batchProcessor batchProcessor = batchProcessorWithAllowedServiceNames(batchProcessor, r.cfg.RumConfig.AllowServiceNames) - h := intake.Handler(newProcessor(r.cfg), batchProcessor) - return middleware.Wrap(h, rumMiddleware(r.cfg, nil, intake.MonitoringMap)...) + h := intake.Handler(newProcessor(r.cfg), requestMetadataFunc, batchProcessor) + return middleware.Wrap(h, rumMiddleware(r.cfg, nil, r.ratelimitStore, intake.MonitoringMap)...) } } func (r *routeBuilder) sourcemapHandler() (request.Handler, error) { h := sourcemap.Handler(r.reporter) authHandler := r.authBuilder.ForPrivilege(authorization.PrivilegeSourcemapWrite.Action) - return middleware.Wrap(h, sourcemapMiddleware(r.cfg, authHandler)...) + return middleware.Wrap(h, sourcemapMiddleware(r.cfg, authHandler, r.ratelimitStore)...) } func (r *routeBuilder) rootHandler() (request.Handler, error) { @@ -177,26 +200,27 @@ func (r *routeBuilder) rootHandler() (request.Handler, error) { func (r *routeBuilder) backendAgentConfigHandler(f agentcfg.Fetcher) func() (request.Handler, error) { return func() (request.Handler, error) { authHandler := r.authBuilder.ForPrivilege(authorization.PrivilegeAgentConfigRead.Action) - return agentConfigHandler(r.cfg, authHandler, backendMiddleware, f) + return agentConfigHandler(r.cfg, authHandler, r.ratelimitStore, backendMiddleware, f) } } func (r *routeBuilder) rumAgentConfigHandler(f agentcfg.Fetcher) func() (request.Handler, error) { return func() (request.Handler, error) { - return agentConfigHandler(r.cfg, nil, rumMiddleware, f) + return agentConfigHandler(r.cfg, nil, r.ratelimitStore, rumMiddleware, f) } } -type middlewareFunc func(*config.Config, *authorization.Handler, map[request.ResultID]*monitoring.Int) []middleware.Middleware +type middlewareFunc func(*config.Config, *authorization.Handler, *ratelimit.Store, map[request.ResultID]*monitoring.Int) []middleware.Middleware func agentConfigHandler( cfg *config.Config, authHandler *authorization.Handler, + ratelimitStore *ratelimit.Store, middlewareFunc middlewareFunc, f agentcfg.Fetcher, ) (request.Handler, error) { - mw := middlewareFunc(cfg, authHandler, agent.MonitoringMap) - h := agent.NewHandler(f, cfg.KibanaAgentConfig, cfg.DefaultServiceEnvironment) + mw := middlewareFunc(cfg, authHandler, ratelimitStore, agent.MonitoringMap) + h := agent.NewHandler(f, cfg.KibanaAgentConfig, cfg.DefaultServiceEnvironment, rumAgents) if !cfg.Kibana.Enabled && cfg.AgentConfigs == nil { msg := "Agent remote configuration is disabled. " + @@ -219,37 +243,30 @@ func apmMiddleware(m map[request.ResultID]*monitoring.Int) []middleware.Middlewa } } -func backendMiddleware(cfg *config.Config, auth *authorization.Handler, m map[request.ResultID]*monitoring.Int) []middleware.Middleware { +func backendMiddleware(cfg *config.Config, auth *authorization.Handler, ratelimitStore *ratelimit.Store, m map[request.ResultID]*monitoring.Int) []middleware.Middleware { backendMiddleware := append(apmMiddleware(m), middleware.ResponseHeadersMiddleware(cfg.ResponseHeaders), middleware.AuthorizationMiddleware(auth, true), + middleware.AnonymousRateLimitMiddleware(ratelimitStore), ) - if cfg.AugmentEnabled { - backendMiddleware = append(backendMiddleware, middleware.SystemMetadataMiddleware()) - } return backendMiddleware } -func rumMiddleware(cfg *config.Config, _ *authorization.Handler, m map[request.ResultID]*monitoring.Int) []middleware.Middleware { +func rumMiddleware(cfg *config.Config, auth *authorization.Handler, ratelimitStore *ratelimit.Store, m map[request.ResultID]*monitoring.Int) []middleware.Middleware { msg := "RUM endpoint is disabled. " + "Configure the `apm-server.rum` section in apm-server.yml to enable ingestion of RUM events. " + "If you are not using the RUM agent, you can safely ignore this error." rumMiddleware := append(apmMiddleware(m), middleware.ResponseHeadersMiddleware(cfg.ResponseHeaders), middleware.ResponseHeadersMiddleware(cfg.RumConfig.ResponseHeaders), - middleware.SetRumFlagMiddleware(), - middleware.SetIPRateLimitMiddleware(cfg.RumConfig.EventRate), middleware.CORSMiddleware(cfg.RumConfig.AllowOrigins, cfg.RumConfig.AllowHeaders), middleware.AnonymousAuthorizationMiddleware(), - middleware.KillSwitchMiddleware(cfg.RumConfig.Enabled, msg), + middleware.AnonymousRateLimitMiddleware(ratelimitStore), ) - if cfg.AugmentEnabled { - rumMiddleware = append(rumMiddleware, middleware.UserMetadataMiddleware()) - } - return rumMiddleware + return append(rumMiddleware, middleware.KillSwitchMiddleware(cfg.RumConfig.Enabled, msg)) } -func sourcemapMiddleware(cfg *config.Config, auth *authorization.Handler) []middleware.Middleware { +func sourcemapMiddleware(cfg *config.Config, auth *authorization.Handler, ratelimitStore *ratelimit.Store) []middleware.Middleware { msg := "Sourcemap upload endpoint is disabled. " + "Configure the `apm-server.rum` section in apm-server.yml to enable sourcemap uploads. " + "If you are not using the RUM agent, you can safely ignore this error." @@ -257,8 +274,8 @@ func sourcemapMiddleware(cfg *config.Config, auth *authorization.Handler) []midd msg = "When APM Server is managed by Fleet, Sourcemaps must be uploaded directly to Elasticsearch." } enabled := cfg.RumConfig.Enabled && cfg.RumConfig.SourceMapping.Enabled && !cfg.DataStreams.Enabled - return append(backendMiddleware(cfg, auth, sourcemap.MonitoringMap), - middleware.KillSwitchMiddleware(enabled, msg)) + backendMiddleware := backendMiddleware(cfg, auth, ratelimitStore, sourcemap.MonitoringMap) + return append(backendMiddleware, middleware.KillSwitchMiddleware(enabled, msg)) } func rootMiddleware(cfg *config.Config, auth *authorization.Handler) []middleware.Middleware { @@ -288,3 +305,18 @@ func batchProcessorWithAllowedServiceNames(p model.BatchProcessor, allowedServic } return modelprocessor.Chained{restrictServiceName, p} } + +func emptyRequestMetadata(c *request.Context) model.Metadata { + return model.Metadata{} +} + +func backendRequestMetadata(c *request.Context) model.Metadata { + return model.Metadata{System: model.System{IP: c.SourceIP}} +} + +func rumRequestMetadata(c *request.Context) model.Metadata { + return model.Metadata{ + Client: model.Client{IP: c.SourceIP}, + UserAgent: model.UserAgent{Original: c.UserAgent}, + } +} diff --git a/beater/api/mux_intake_rum_test.go b/beater/api/mux_intake_rum_test.go index 5bf99c1ee44..2f5f728a5b2 100644 --- a/beater/api/mux_intake_rum_test.go +++ b/beater/api/mux_intake_rum_test.go @@ -29,6 +29,7 @@ import ( "github.com/elastic/apm-server/approvaltest" "github.com/elastic/apm-server/beater/api/intake" + "github.com/elastic/apm-server/beater/api/ratelimit" "github.com/elastic/apm-server/beater/config" "github.com/elastic/apm-server/beater/headers" "github.com/elastic/apm-server/beater/middleware" @@ -36,6 +37,7 @@ import ( ) func TestOPTIONS(t *testing.T) { + ratelimitStore, _ := ratelimit.NewStore(1, 1, 1) requestTaken := make(chan struct{}, 1) done := make(chan struct{}, 1) @@ -46,7 +48,7 @@ func TestOPTIONS(t *testing.T) { requestTaken <- struct{}{} <-done }, - rumMiddleware(cfg, nil, intake.MonitoringMap)...) + rumMiddleware(cfg, nil, ratelimitStore, intake.MonitoringMap)...) // use this to block the single allowed concurrent requests go func() { diff --git a/beater/api/mux_test.go b/beater/api/mux_test.go index 8ce997c1eda..7b6be8969e8 100644 --- a/beater/api/mux_test.go +++ b/beater/api/mux_test.go @@ -28,6 +28,7 @@ import ( "github.com/elastic/apm-server/agentcfg" "github.com/elastic/apm-server/approvaltest" + "github.com/elastic/apm-server/beater/api/ratelimit" "github.com/elastic/apm-server/beater/beatertest" "github.com/elastic/apm-server/beater/config" "github.com/elastic/apm-server/beater/request" @@ -76,7 +77,8 @@ func requestToMuxerWithHeaderAndQueryString( func requestToMuxer(cfg *config.Config, r *http.Request) (*httptest.ResponseRecorder, error) { nopReporter := func(context.Context, publish.PendingReq) error { return nil } nopBatchProcessor := model.ProcessBatchFunc(func(context.Context, *model.Batch) error { return nil }) - mux, err := NewMux(beat.Info{Version: "1.2.3"}, cfg, nopReporter, nopBatchProcessor, agentcfg.NewFetcher(cfg)) + ratelimitStore, _ := ratelimit.NewStore(1000, 1000, 1000) + mux, err := NewMux(beat.Info{Version: "1.2.3"}, cfg, nopReporter, nopBatchProcessor, agentcfg.NewFetcher(cfg), ratelimitStore) if err != nil { return nil, err } @@ -111,7 +113,8 @@ func testMonitoringMiddleware(t *testing.T, urlPath string, monitoringMap map[re func newTestMux(t *testing.T, cfg *config.Config) http.Handler { nopReporter := func(context.Context, publish.PendingReq) error { return nil } nopBatchProcessor := model.ProcessBatchFunc(func(context.Context, *model.Batch) error { return nil }) - mux, err := NewMux(beat.Info{Version: "1.2.3"}, cfg, nopReporter, nopBatchProcessor, agentcfg.NewFetcher(cfg)) + ratelimitStore, _ := ratelimit.NewStore(1000, 1000, 1000) + mux, err := NewMux(beat.Info{Version: "1.2.3"}, cfg, nopReporter, nopBatchProcessor, agentcfg.NewFetcher(cfg), ratelimitStore) require.NoError(t, err) return mux } diff --git a/beater/api/profile/handler.go b/beater/api/profile/handler.go index bf1c10f7617..e1f30da55fd 100644 --- a/beater/api/profile/handler.go +++ b/beater/api/profile/handler.go @@ -55,8 +55,13 @@ const ( profileContentLengthLimit = 10 * 1024 * 1024 ) +// RequestMetadataFunc is a function type supplied to Handler for extracting +// metadata from the request. This is used for conditionally injecting the +// source IP address as `client.ip` for RUM. +type RequestMetadataFunc func(*request.Context) model.Metadata + // Handler returns a request.Handler for managing profile requests. -func Handler(processor model.BatchProcessor) request.Handler { +func Handler(requestMetadataFunc RequestMetadataFunc, processor model.BatchProcessor) request.Handler { handle := func(c *request.Context) (*result, error) { if c.Request.Method != http.MethodPost { return nil, requestError{ @@ -71,14 +76,6 @@ func Handler(processor model.BatchProcessor) request.Handler { } } - ok := c.RateLimiter == nil || c.RateLimiter.Allow() - if !ok { - return nil, requestError{ - id: request.IDResponseErrorsRateLimit, - err: errors.New("rate limit exceeded"), - } - } - var totalLimitRemaining int64 = profileContentLengthLimit var profiles []*pprof_profile.Profile var profileMetadata model.Metadata @@ -104,10 +101,7 @@ func Handler(processor model.BatchProcessor) request.Handler { } r := &decoder.LimitedReader{R: part, N: metadataContentLengthLimit} dec := decoder.NewJSONDecoder(r) - metadata := model.Metadata{ - UserAgent: model.UserAgent{Original: c.RequestMetadata.UserAgent}, - Client: model.Client{IP: c.RequestMetadata.ClientIP}, - System: model.System{IP: c.RequestMetadata.SystemIP}} + metadata := requestMetadataFunc(c) if err := v2.DecodeMetadata(dec, &metadata); err != nil { if r.N < 0 { return nil, requestError{ diff --git a/beater/api/profile/handler_test.go b/beater/api/profile/handler_test.go index 9a388eea4b3..0fa0f80eb52 100644 --- a/beater/api/profile/handler_test.go +++ b/beater/api/profile/handler_test.go @@ -31,7 +31,6 @@ import ( "strings" "testing" - "github.com/elastic/apm-server/beater/api/ratelimit" "github.com/elastic/apm-server/model" "github.com/stretchr/testify/assert" @@ -45,8 +44,6 @@ import ( const pprofContentType = `application/x-protobuf; messageType="perftools.profiles.Profile"` func TestHandler(t *testing.T) { - var rateLimit, err = ratelimit.NewStore(1, 0, 0) - require.NoError(t, err) for name, tc := range map[string]testcaseIntakeHandler{ "MethodNotAllowed": { r: httptest.NewRequest(http.MethodGet, "/", nil), @@ -60,10 +57,6 @@ func TestHandler(t *testing.T) { }(), id: request.IDResponseErrorsValidate, }, - "RateLimitExceeded": { - rateLimit: rateLimit, - id: request.IDResponseErrorsRateLimit, - }, "Closing": { batchProcessor: func(t *testing.T) model.BatchProcessor { return model.ProcessBatchFunc(func(context.Context, *model.Batch) error { @@ -199,10 +192,7 @@ func TestHandler(t *testing.T) { } { t.Run(name, func(t *testing.T) { tc.setup(t) - if tc.rateLimit != nil { - tc.c.RateLimiter = tc.rateLimit.ForIP(&http.Request{}) - } - Handler(tc.batchProcessor(t))(tc.c) + Handler(emptyRequestMetadata, tc.batchProcessor(t))(tc.c) assert.Equal(t, string(tc.id), string(tc.c.Result.ID)) resultStatus := request.MapResultIDToStatus[tc.id] @@ -225,7 +215,6 @@ type testcaseIntakeHandler struct { c *request.Context w *httptest.ResponseRecorder r *http.Request - rateLimit *ratelimit.Store batchProcessor func(t *testing.T) model.BatchProcessor reports int parts []part @@ -299,3 +288,7 @@ func prettyJSON(v interface{}) string { enc.Encode(v) return buf.String() } + +func emptyRequestMetadata(*request.Context) model.Metadata { + return model.Metadata{} +} diff --git a/beater/middleware/rum_middleware.go b/beater/api/ratelimit/context.go similarity index 52% rename from beater/middleware/rum_middleware.go rename to beater/api/ratelimit/context.go index be3f8f33d4d..d17f99597de 100644 --- a/beater/middleware/rum_middleware.go +++ b/beater/api/ratelimit/context.go @@ -15,18 +15,28 @@ // specific language governing permissions and limitations // under the License. -package middleware +package ratelimit import ( - "github.com/elastic/apm-server/beater/request" + "context" + + "github.com/pkg/errors" + "golang.org/x/time/rate" ) -// SetRumFlagMiddleware sets a rum flag in the context -func SetRumFlagMiddleware() Middleware { - return func(h request.Handler) (request.Handler, error) { - return func(c *request.Context) { - c.IsRum = true - h(c) - }, nil - } +// ErrRateLimitExceeded is returned when the rate limit is exceeded. +var ErrRateLimitExceeded = errors.New("rate limit exceeded") + +type rateLimiterKey struct{} + +// FromContext returns a rate.Limiter if one is contained in ctx, +// and a bool indicating whether one was found. +func FromContext(ctx context.Context) (*rate.Limiter, bool) { + limiter, ok := ctx.Value(rateLimiterKey{}).(*rate.Limiter) + return limiter, ok +} + +// ContextWithLimiter returns a copy of parent associated with limiter. +func ContextWithLimiter(parent context.Context, limiter *rate.Limiter) context.Context { + return context.WithValue(parent, rateLimiterKey{}, limiter) } diff --git a/beater/api/ratelimit/store.go b/beater/api/ratelimit/store.go index b9fac6c8cfd..660a3af76cd 100644 --- a/beater/api/ratelimit/store.go +++ b/beater/api/ratelimit/store.go @@ -18,11 +18,9 @@ package ratelimit import ( - "net/http" + "net" "sync" - "github.com/elastic/apm-server/utility" - "github.com/hashicorp/golang-lru/simplelru" "github.com/pkg/errors" "golang.org/x/time/rate" @@ -63,8 +61,9 @@ func NewStore(size, rateLimit, burstFactor int) (*Store, error) { return &store, nil } -// acquire returns a rate.Limiter instance for the given key -func (s *Store) acquire(key string) *rate.Limiter { +// ForIP returns a rate limiter for the given IP. +func (s *Store) ForIP(ip net.IP) *rate.Limiter { + key := ip.String() // lock get and add action for cache to allow proper eviction handling without // race conditions. @@ -83,11 +82,3 @@ func (s *Store) acquire(key string) *rate.Limiter { } return limiter } - -// ForIP returns a rate limiter for the given request IP -func (s *Store) ForIP(r *http.Request) *rate.Limiter { - if s == nil { - return nil - } - return s.acquire(utility.RemoteAddr(r)) -} diff --git a/beater/api/ratelimit/store_test.go b/beater/api/ratelimit/store_test.go index e6559e089f0..4706e9dbff8 100644 --- a/beater/api/ratelimit/store_test.go +++ b/beater/api/ratelimit/store_test.go @@ -18,7 +18,7 @@ package ratelimit import ( - "net/http" + "net" "testing" "time" @@ -38,7 +38,6 @@ func TestCacheInitFails(t *testing.T) { c, err := NewStore(test.size, test.limit, 3) assert.Error(t, err) assert.Nil(t, c) - assert.Nil(t, c.ForIP(&http.Request{})) } } @@ -50,20 +49,20 @@ func TestCacheEviction(t *testing.T) { require.NoError(t, err) // add new limiter - rlA := store.acquire("a") + rlA := store.ForIP(net.ParseIP("127.0.0.1")) rlA.AllowN(time.Now(), 3) // add new limiter - rlB := store.acquire("b") + rlB := store.ForIP(net.ParseIP("127.0.0.2")) rlB.AllowN(time.Now(), 2) // reuse evicted limiter rlA - rlC := store.acquire("c") + rlC := store.ForIP(net.ParseIP("127.0.0.3")) assert.False(t, rlC.Allow()) assert.Equal(t, rlC, store.evictedLimiter) // reuse evicted limiter rlB - rlD := store.acquire("a") + rlD := store.ForIP(net.ParseIP("127.0.0.1")) assert.True(t, rlD.Allow()) assert.False(t, rlD.Allow()) assert.Equal(t, rlD, store.evictedLimiter) @@ -77,22 +76,6 @@ func TestCacheEviction(t *testing.T) { func TestCacheOk(t *testing.T) { store, err := NewStore(1, 1, 1) require.NoError(t, err) - limiter := store.acquire("a") + limiter := store.ForIP(net.ParseIP("127.0.0.1")) assert.NotNil(t, limiter) } - -func TestRateLimitPerIP(t *testing.T) { - store, err := NewStore(2, 1, 1) - require.NoError(t, err) - - var reqFrom = func(ip string) *http.Request { - r := http.Request{} - r.Header = http.Header{} - r.Header.Set("X-Real-Ip", ip) - return &r - } - assert.True(t, store.ForIP(reqFrom("10.10.10.1")).Allow()) - assert.False(t, store.ForIP(reqFrom("10.10.10.1")).Allow()) - assert.True(t, store.ForIP(reqFrom("10.10.10.2")).Allow()) - assert.False(t, store.ForIP(reqFrom("10.10.10.3")).Allow()) -} diff --git a/beater/http.go b/beater/http.go index 994e4b396ed..d7bcb77bab5 100644 --- a/beater/http.go +++ b/beater/http.go @@ -29,6 +29,7 @@ import ( "github.com/elastic/apm-server/agentcfg" "github.com/elastic/apm-server/beater/api" + "github.com/elastic/apm-server/beater/api/ratelimit" "github.com/elastic/apm-server/beater/config" "github.com/elastic/apm-server/model" "github.com/elastic/apm-server/model/modelprocessor" @@ -47,7 +48,16 @@ type httpServer struct { grpcListener net.Listener } -func newHTTPServer(logger *logp.Logger, info beat.Info, cfg *config.Config, tracer *apm.Tracer, reporter publish.Reporter, batchProcessor model.BatchProcessor, f agentcfg.Fetcher) (*httpServer, error) { +func newHTTPServer( + logger *logp.Logger, + info beat.Info, + cfg *config.Config, + tracer *apm.Tracer, + reporter publish.Reporter, + batchProcessor model.BatchProcessor, + agentcfgFetcher agentcfg.Fetcher, + ratelimitStore *ratelimit.Store, +) (*httpServer, error) { // Add a model processor that checks authorization for the agent and service for each event. batchProcessor = modelprocessor.Chained{ @@ -55,7 +65,7 @@ func newHTTPServer(logger *logp.Logger, info beat.Info, cfg *config.Config, trac batchProcessor, } - mux, err := api.NewMux(info, cfg, reporter, batchProcessor, f) + mux, err := api.NewMux(info, cfg, reporter, batchProcessor, agentcfgFetcher, ratelimitStore) if err != nil { return nil, err } diff --git a/beater/middleware/authorization_middleware.go b/beater/middleware/authorization_middleware.go index 2de8adb25f1..724b87881d4 100644 --- a/beater/middleware/authorization_middleware.go +++ b/beater/middleware/authorization_middleware.go @@ -65,6 +65,7 @@ func AnonymousAuthorizationMiddleware() Middleware { return func(h request.Handler) (request.Handler, error) { return func(c *request.Context) { auth := authorization.AnonymousAuth{} + c.AuthResult = authorization.Result{Authorized: true, Anonymous: true} c.Request = c.Request.WithContext(authorization.ContextWithAuthorization(c.Request.Context(), auth)) h(c) }, nil diff --git a/beater/middleware/rate_limit_middleware.go b/beater/middleware/rate_limit_middleware.go index 8b71f65cfe6..cb954432517 100644 --- a/beater/middleware/rate_limit_middleware.go +++ b/beater/middleware/rate_limit_middleware.go @@ -19,20 +19,33 @@ package middleware import ( "github.com/elastic/apm-server/beater/api/ratelimit" - "github.com/elastic/apm-server/beater/config" "github.com/elastic/apm-server/beater/request" ) -const burstMultiplier = 3 - -// SetIPRateLimitMiddleware sets a rate limiter -func SetIPRateLimitMiddleware(cfg config.EventRate) Middleware { - store, err := ratelimit.NewStore(cfg.LruSize, cfg.Limit, burstMultiplier) - +// AnonymousRateLimitMiddleware adds a rate.Limiter to the context of anonymous +// requests, first ensuring the client is allowed to perform a single event and +// responding with 429 Too Many Requests if it is not. +// +// This middleware must be wrapped by AuthorizationMiddleware, as it depends on +// the value of c.AuthResult.Anonymous. +func AnonymousRateLimitMiddleware(store *ratelimit.Store) Middleware { return func(h request.Handler) (request.Handler, error) { return func(c *request.Context) { - c.RateLimiter = store.ForIP(c.Request) + if c.AuthResult.Anonymous { + limiter := store.ForIP(c.SourceIP) + if !limiter.Allow() { + c.Result.SetWithError( + request.IDResponseErrorsRateLimit, + ratelimit.ErrRateLimitExceeded, + ) + c.Write() + return + } + ctx := c.Request.Context() + ctx = ratelimit.ContextWithLimiter(ctx, limiter) + c.Request = c.Request.WithContext(ctx) + } h(c) - }, err + }, nil } } diff --git a/beater/middleware/rate_limit_middleware_test.go b/beater/middleware/rate_limit_middleware_test.go new file mode 100644 index 00000000000..bd6c9d18b89 --- /dev/null +++ b/beater/middleware/rate_limit_middleware_test.go @@ -0,0 +1,107 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package middleware + +import ( + "net/http" + "net/http/httptest" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/elastic/apm-server/beater/api/ratelimit" + "github.com/elastic/apm-server/beater/request" +) + +func TestAnonymousRateLimitMiddleware(t *testing.T) { + type test struct { + burst int + anonymous bool + + expectStatusCode int + expectAllow bool + } + for _, test := range []test{{ + burst: 0, + anonymous: false, + expectStatusCode: http.StatusOK, + }, { + burst: 0, + anonymous: true, + expectStatusCode: http.StatusTooManyRequests, + }, { + burst: 1, + anonymous: true, + expectStatusCode: http.StatusOK, + expectAllow: false, + }, { + burst: 2, + anonymous: true, + expectStatusCode: http.StatusOK, + expectAllow: true, + }} { + store, _ := ratelimit.NewStore(1, 1, test.burst) + middleware := AnonymousRateLimitMiddleware(store) + handler := func(c *request.Context) { + limiter, ok := ratelimit.FromContext(c.Request.Context()) + if test.anonymous { + require.True(t, ok) + assert.Equal(t, test.expectAllow, limiter.Allow()) + } else { + require.False(t, ok) + } + } + wrapped, err := middleware(handler) + require.NoError(t, err) + + c := request.NewContext() + w := httptest.NewRecorder() + r := httptest.NewRequest("GET", "/", nil) + c.Reset(w, r) + c.AuthResult.Anonymous = test.anonymous + + wrapped(c) + assert.Equal(t, test.expectStatusCode, w.Code) + } +} + +func TestAnonymousRateLimitMiddlewareForIP(t *testing.T) { + store, _ := ratelimit.NewStore(2, 1, 1) + middleware := AnonymousRateLimitMiddleware(store) + handler := func(c *request.Context) {} + wrapped, err := middleware(handler) + require.NoError(t, err) + + requestWithIP := func(ip string) int { + c := request.NewContext() + w := httptest.NewRecorder() + r := httptest.NewRequest("GET", "/", nil) + r.RemoteAddr = ip + c.Reset(w, r) + c.AuthResult.Anonymous = true + wrapped(c) + return w.Code + } + assert.Equal(t, http.StatusOK, requestWithIP("10.1.1.1")) + assert.Equal(t, http.StatusTooManyRequests, requestWithIP("10.1.1.1")) + assert.Equal(t, http.StatusOK, requestWithIP("10.1.1.2")) + + // ratelimit.Store size is 2: the 3rd IP reuses an existing (depleted) rate limiter. + assert.Equal(t, http.StatusTooManyRequests, requestWithIP("10.1.1.3")) +} diff --git a/beater/middleware/request_metadata_middleware.go b/beater/middleware/request_metadata_middleware.go deleted file mode 100644 index 9bb57d44808..00000000000 --- a/beater/middleware/request_metadata_middleware.go +++ /dev/null @@ -1,46 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package middleware - -import ( - "github.com/elastic/apm-server/beater/request" - "github.com/elastic/apm-server/utility" -) - -// UserMetadataMiddleware returns a Middleware recording request-level -// user metadata (e.g. user-agent and source IP) in the request's context. -func UserMetadataMiddleware() Middleware { - return func(h request.Handler) (request.Handler, error) { - return func(c *request.Context) { - c.RequestMetadata.UserAgent = utility.UserAgentHeader(c.Request.Header) - c.RequestMetadata.ClientIP = utility.ExtractIP(c.Request) - h(c) - }, nil - } -} - -// SystemMetadataMiddleware returns a Middleware recording request-level -// system metadata (e.g. source IP) in the request's context. -func SystemMetadataMiddleware() Middleware { - return func(h request.Handler) (request.Handler, error) { - return func(c *request.Context) { - c.RequestMetadata.SystemIP = utility.ExtractIP(c.Request) - h(c) - }, nil - } -} diff --git a/beater/middleware/request_metadata_middleware_test.go b/beater/middleware/request_metadata_middleware_test.go deleted file mode 100644 index ac69123c5c3..00000000000 --- a/beater/middleware/request_metadata_middleware_test.go +++ /dev/null @@ -1,77 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package middleware - -import ( - "fmt" - "net" - "testing" - - "github.com/stretchr/testify/assert" - - "github.com/elastic/apm-server/beater/beatertest" -) - -func TestUserMetadataMiddleware(t *testing.T) { - type test struct { - remoteAddr string - userAgent []string - expectedIP net.IP - expectedUserAgent string - } - - ua1 := "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/51.0.2704.103 Safari/537.36" - ua2 := "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.14; rv:67.0) Gecko/20100101 Firefox/67.0" - tests := []test{ - {remoteAddr: "1.2.3.4:1234", expectedIP: net.ParseIP("1.2.3.4"), userAgent: []string{ua1, ua2}, expectedUserAgent: fmt.Sprintf("%s, %s", ua1, ua2)}, - {remoteAddr: "not-an-ip:1234", userAgent: []string{ua1}, expectedUserAgent: ua1}, - {remoteAddr: ""}, - } - - for _, test := range tests { - c, _ := beatertest.DefaultContextWithResponseRecorder() - c.Request.RemoteAddr = test.remoteAddr - for _, ua := range test.userAgent { - c.Request.Header.Add("User-Agent", ua) - } - - Apply(UserMetadataMiddleware(), beatertest.HandlerIdle)(c) - assert.Equal(t, test.expectedUserAgent, c.RequestMetadata.UserAgent) - assert.Equal(t, test.expectedIP, c.RequestMetadata.ClientIP) - } -} - -func TestSystemMetadataMiddleware(t *testing.T) { - type test struct { - remoteAddr string - expectedIP net.IP - } - tests := []test{ - {remoteAddr: "1.2.3.4:1234", expectedIP: net.ParseIP("1.2.3.4")}, - {remoteAddr: "not-an-ip:1234"}, - {remoteAddr: ""}, - } - - for _, test := range tests { - c, _ := beatertest.DefaultContextWithResponseRecorder() - c.Request.RemoteAddr = test.remoteAddr - - Apply(SystemMetadataMiddleware(), beatertest.HandlerIdle)(c) - assert.Equal(t, test.expectedIP, c.RequestMetadata.SystemIP) - } -} diff --git a/beater/request/context.go b/beater/request/context.go index 118643ef412..4478ce9a8e3 100644 --- a/beater/request/context.go +++ b/beater/request/context.go @@ -23,13 +23,12 @@ import ( "net/http" "strings" - "golang.org/x/time/rate" - "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/apm-server/beater/authorization" "github.com/elastic/apm-server/beater/headers" logs "github.com/elastic/apm-server/log" + "github.com/elastic/apm-server/utility" ) const ( @@ -43,26 +42,17 @@ var ( // Context abstracts request and response information for http requests type Context struct { - Request *http.Request - Logger *logp.Logger - RateLimiter *rate.Limiter - AuthResult authorization.Result - IsRum bool - Result Result - RequestMetadata Metadata + Request *http.Request + Logger *logp.Logger + AuthResult authorization.Result + Result Result + SourceIP net.IP + UserAgent string w http.ResponseWriter writeAttempts int } -// Metadata contains metadata extracted from the request by middleware, -// and should be merged into the event metadata. -type Metadata struct { - ClientIP net.IP - SystemIP net.IP - UserAgent string -} - // NewContext creates an empty Context struct func NewContext() *Context { return &Context{} @@ -72,23 +62,15 @@ func NewContext() *Context { func (c *Context) Reset(w http.ResponseWriter, r *http.Request) { c.Request = r c.Logger = nil - c.RateLimiter = nil c.AuthResult = authorization.Result{} - c.IsRum = false c.Result.Reset() - c.RequestMetadata.Reset() + c.SourceIP = utility.ExtractIP(r) + c.UserAgent = utility.UserAgentHeader(r.Header) c.w = w c.writeAttempts = 0 } -// Reset sets all attribtues of the Metadata instance to it's zero value -func (m *Metadata) Reset() { - m.ClientIP = nil - m.SystemIP = nil - m.UserAgent = "" -} - // Header returns the http.Header of the context's writer func (c *Context) Header() http.Header { return c.w.Header() diff --git a/beater/request/context_test.go b/beater/request/context_test.go index aa916ac5158..2ec5eb9e1ed 100644 --- a/beater/request/context_test.go +++ b/beater/request/context_test.go @@ -18,6 +18,7 @@ package request import ( + "net" "net/http" "net/http/httptest" "reflect" @@ -37,7 +38,11 @@ func TestContext_Reset(t *testing.T) { w1.WriteHeader(http.StatusServiceUnavailable) w2 := httptest.NewRecorder() r1 := httptest.NewRequest(http.MethodGet, "/", nil) + r1.RemoteAddr = "10.1.2.3:4321" + r1.Header.Set("User-Agent", "ua1") r2 := httptest.NewRequest(http.MethodHead, "/new", nil) + r2.RemoteAddr = "10.1.2.3:1234" + r2.Header.Set("User-Agent", "ua2") c := Context{ Request: r1, w: w1, @@ -65,8 +70,10 @@ func TestContext_Reset(t *testing.T) { assert.Equal(t, 0, c.writeAttempts) case "Result": assertResultIsEmpty(t, cVal.Field(i).Interface().(Result)) - case "RequestMetadata": - assert.Equal(t, Metadata{}, cVal.Field(i).Interface().(Metadata)) + case "SourceIP": + assert.Equal(t, net.ParseIP("10.1.2.3"), cVal.Field(i).Interface()) + case "UserAgent": + assert.Equal(t, "ua2", cVal.Field(i).Interface()) default: assert.Empty(t, cVal.Field(i).Interface(), cType.Field(i).Name) } diff --git a/beater/server.go b/beater/server.go index 7d144321402..11179ebb31b 100644 --- a/beater/server.go +++ b/beater/server.go @@ -32,6 +32,7 @@ import ( "github.com/elastic/beats/v7/libbeat/version" "github.com/elastic/apm-server/agentcfg" + "github.com/elastic/apm-server/beater/api/ratelimit" "github.com/elastic/apm-server/beater/authorization" "github.com/elastic/apm-server/beater/config" "github.com/elastic/apm-server/beater/interceptors" @@ -116,16 +117,24 @@ func newServer( reporter publish.Reporter, batchProcessor model.BatchProcessor, ) (server, error) { - fetcher := agentcfg.NewFetcher(cfg) - httpServer, err := newHTTPServer(logger, info, cfg, tracer, reporter, batchProcessor, fetcher) + agentcfgFetcher := agentcfg.NewFetcher(cfg) + ratelimitStore, err := ratelimit.NewStore( + cfg.RumConfig.EventRate.LruSize, + cfg.RumConfig.EventRate.Limit, + 3, // burst multiplier + ) if err != nil { return server{}, err } - grpcServer, err := newGRPCServer(logger, cfg, tracer, batchProcessor, httpServer.TLSConfig, fetcher) + httpServer, err := newHTTPServer(logger, info, cfg, tracer, reporter, batchProcessor, agentcfgFetcher, ratelimitStore) if err != nil { return server{}, err } - jaegerServer, err := jaeger.NewServer(logger, cfg, tracer, batchProcessor, fetcher) + grpcServer, err := newGRPCServer(logger, cfg, tracer, batchProcessor, httpServer.TLSConfig, agentcfgFetcher, ratelimitStore) + if err != nil { + return server{}, err + } + jaegerServer, err := jaeger.NewServer(logger, cfg, tracer, batchProcessor, agentcfgFetcher) if err != nil { return server{}, err } @@ -144,7 +153,8 @@ func newGRPCServer( tracer *apm.Tracer, batchProcessor model.BatchProcessor, tlsConfig *tls.Config, - fetcher agentcfg.Fetcher, + agentcfgFetcher agentcfg.Fetcher, + ratelimitStore *ratelimit.Store, ) (*grpc.Server, error) { // TODO(axw) share auth builder with beater/api. authBuilder, err := authorization.NewBuilder(cfg.AgentAuth) @@ -152,10 +162,11 @@ func newGRPCServer( return nil, err } - // NOTE(axw) even if TLS is enabled we should not use grpc.Creds, as TLS is handled by the net/http server. apmInterceptor := apmgrpc.NewUnaryServerInterceptor(apmgrpc.WithRecovery(), apmgrpc.WithTracer(tracer)) authInterceptor := newAuthUnaryServerInterceptor(authBuilder) + // Note that we intentionally do not use a grpc.Creds ServerOption + // even if TLS is enabled, as TLS is handled by the net/http server. logger = logger.Named("grpc") srv := grpc.NewServer( grpc.ChainUnaryInterceptor( @@ -165,6 +176,9 @@ func newGRPCServer( interceptors.Metrics(logger, otlp.RegistryMonitoringMaps, jaeger.RegistryMonitoringMaps), interceptors.Timeout(), authInterceptor, + + // TODO(axw) add a rate limiting interceptor here once we've + // updated authInterceptor to handle auth for Jaeger requests. ), ) @@ -182,7 +196,7 @@ func newGRPCServer( batchProcessor, } - jaeger.RegisterGRPCServices(srv, authBuilder, jaeger.ElasticAuthTag, logger, batchProcessor, fetcher) + jaeger.RegisterGRPCServices(srv, authBuilder, jaeger.ElasticAuthTag, logger, batchProcessor, agentcfgFetcher) if err := otlp.RegisterGRPCServices(srv, batchProcessor); err != nil { return nil, err } diff --git a/beater/tracing.go b/beater/tracing.go index 8d37f979ea5..6a9f791795b 100644 --- a/beater/tracing.go +++ b/beater/tracing.go @@ -27,6 +27,7 @@ import ( "github.com/elastic/apm-server/agentcfg" "github.com/elastic/apm-server/beater/api" + "github.com/elastic/apm-server/beater/api/ratelimit" "github.com/elastic/apm-server/beater/config" "github.com/elastic/apm-server/model" "github.com/elastic/apm-server/publish" @@ -59,7 +60,18 @@ func newTracerServer(listener net.Listener, logger *logp.Logger) (*tracerServer, } }) cfg := config.DefaultConfig() - mux, err := api.NewMux(beat.Info{}, cfg, nopReporter, processBatch, agentcfg.NewFetcher(cfg)) + ratelimitStore, err := ratelimit.NewStore(1, 1, 1) // unused, arbitrary params + if err != nil { + return nil, err + } + mux, err := api.NewMux( + beat.Info{}, + cfg, + nopReporter, + processBatch, + agentcfg.NewFetcher(cfg), + ratelimitStore, + ) if err != nil { return nil, err } diff --git a/systemtest/rum_test.go b/systemtest/rum_test.go index 7f72a3ea6d2..4d82b0a71ef 100644 --- a/systemtest/rum_test.go +++ b/systemtest/rum_test.go @@ -192,7 +192,12 @@ func TestRUMRateLimit(t *testing.T) { g.Go(func() error { return sendEvents("10.11.12.13", srv.Config.RUM.RateLimit.EventLimit) }) g.Go(func() error { return sendEvents("10.11.12.14", srv.Config.RUM.RateLimit.EventLimit) }) g.Go(func() error { return sendEvents("10.11.12.15", srv.Config.RUM.RateLimit.EventLimit) }) - assert.EqualError(t, g.Wait(), `429 Too Many Requests ({"accepted":0,"errors":[{"message":"rate limit exceeded"}]})`) + err = g.Wait() + require.Error(t, err) + + // The exact error differs, depending on whether rate limiting was applied at the request + // level, or at the event stream level. Either could occur. + assert.Regexp(t, `429 Too Many Requests .*`, err.Error()) } func sendRUMEventsPayload(t *testing.T, srv *apmservertest.Server, payloadFile string) { From bd5290984db907d8b6dfaeafdc2028fab7725166 Mon Sep 17 00:00:00 2001 From: Julien Mailleret <8582351+jmlrt@users.noreply.github.com> Date: Tue, 22 Jun 2021 03:45:22 +0200 Subject: [PATCH 4/5] Fix UBI source URL (#5506) This commit fix the source URL for UBI image to ensure that it stays consistent with the URL generated in https://artifacts.elastic.co/reports/dependencies/dependencies-current.html --- script/generate_notice.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/script/generate_notice.py b/script/generate_notice.py index fd7196f19e0..7916479ecb8 100644 --- a/script/generate_notice.py +++ b/script/generate_notice.py @@ -33,7 +33,7 @@ "version": "8", "url": "https://catalog.redhat.com/software/containers/ubi8/ubi-minimal/5c359a62bed8bd75a2c3fba8", "license": "Custom;https://www.redhat.com/licenses/EULA_Red_Hat_Universal_Base_Image_English_20190422.pdf", - "sourceURL": "https://oss-dependencies.elastic.co/redhat/ubi/ubi-minimal-8-source.tar.gz", + "sourceURL": "https://oss-dependencies.elastic.co/red-hat-universal-base-image-minimal/8/ubi-minimal-8-source.tar.gz", }] From 9f7d9b68b6070fee25efc13836f09cbcc2171b77 Mon Sep 17 00:00:00 2001 From: Andrew Wilkins Date: Tue, 22 Jun 2021 15:22:58 +0800 Subject: [PATCH 5/5] systemtest: remove TestDataStream*, fix Fleet test (#5503) Remove the TestDataStreams tests, which test the server's internal/private/undocumneted configuration for enabling data streams. At least for now, this is only intended to be used with Fleet, so we should instead just rely on the Fleet system test. There was a bug in the Fleet test where apm-server would create "traces-apm-default" as a plain old index instead of a data stream in certain conditions: if the apm integration had previously been installed, then the traces-apm template would be deleted by CleanupElasticsearch. Because the package was already installed, creating the policy would not automatically reinstall it and add the missing template. We fix the Fleet issue by uninstalling the package during Fleet cleanup, after unenrolling agents. --- systemtest/apmservertest/filter.go | 21 ++- systemtest/apmservertest/server.go | 7 +- .../false.approved.json | 76 ---------- ...son => TestFleetIntegration.approved.json} | 4 +- systemtest/datastreams_test.go | 140 ------------------ systemtest/fleet_test.go | 52 ++++++- systemtest/fleettest/client.go | 11 ++ 7 files changed, 76 insertions(+), 235 deletions(-) delete mode 100644 systemtest/approvals/TestDataStreamsEnabled/false.approved.json rename systemtest/approvals/{TestDataStreamsEnabled/true.approved.json => TestFleetIntegration.approved.json} (94%) delete mode 100644 systemtest/datastreams_test.go diff --git a/systemtest/apmservertest/filter.go b/systemtest/apmservertest/filter.go index a851e1fd772..d41926a9394 100644 --- a/systemtest/apmservertest/filter.go +++ b/systemtest/apmservertest/filter.go @@ -29,7 +29,7 @@ import ( "go.elastic.co/fastjson" ) -// TODO(axw) move EventMetadata and filteringTransport to go.elastic.co/apmtest, +// TODO(axw) move EventMetadata and FilteringTransport to go.elastic.co/apmtest, // generalising filteringTransport to work with arbitrary base transports. To do // that we would need to dynamically check for optional interfaces supported by // the base transport, and create passthrough methods. @@ -47,14 +47,22 @@ type EventMetadataFilter interface { FilterEventMetadata(*EventMetadata) } -type filteringTransport struct { +// FilteringTransport is a transport for the APM Go agent which modifies events +// prior to sending them to the underlying transport. +type FilteringTransport struct { *transport.HTTPTransport filter EventMetadataFilter } +// NewFilteringTransport returns a new FilteringTransport that filters events +// using f, and sends them on to h. +func NewFilteringTransport(h *transport.HTTPTransport, f EventMetadataFilter) *FilteringTransport { + return &FilteringTransport{h, f} +} + // SendStream decodes metadata from reader, passes it through the filters, // and then sends the modified stream to the underlying transport. -func (t *filteringTransport) SendStream(ctx context.Context, stream io.Reader) error { +func (t *FilteringTransport) SendStream(ctx context.Context, stream io.Reader) error { zr, err := zlib.NewReader(stream) if err != nil { return err @@ -98,9 +106,12 @@ func (t *filteringTransport) SendStream(ctx context.Context, stream io.Reader) e return t.HTTPTransport.SendStream(ctx, &buf) } -type defaultMetadataFilter struct{} +// DefaultMetadataFilter implements EventMetadataFilter, setting some default values +// for fields that would otherwise by dynamically discovered. +type DefaultMetadataFilter struct{} -func (defaultMetadataFilter) FilterEventMetadata(m *EventMetadata) { +// FilterEventMetadata updates m with default values for dynamically discovered fields. +func (DefaultMetadataFilter) FilterEventMetadata(m *EventMetadata) { m.System.Platform = "minix" m.System.Architecture = "i386" m.System.Container = nil diff --git a/systemtest/apmservertest/server.go b/systemtest/apmservertest/server.go index 63264cb6d33..f180a92f97e 100644 --- a/systemtest/apmservertest/server.go +++ b/systemtest/apmservertest/server.go @@ -117,7 +117,7 @@ func NewServer(tb testing.TB, args ...string) *Server { func NewUnstartedServer(tb testing.TB, args ...string) *Server { return &Server{ Config: DefaultConfig(), - EventMetadataFilter: defaultMetadataFilter{}, + EventMetadataFilter: DefaultMetadataFilter{}, tb: tb, args: args, } @@ -469,10 +469,7 @@ func (s *Server) Tracer() *apm.Tracer { var transport transport.Transport = httpTransport if s.EventMetadataFilter != nil { - transport = &filteringTransport{ - HTTPTransport: httpTransport, - filter: s.EventMetadataFilter, - } + transport = NewFilteringTransport(httpTransport, s.EventMetadataFilter) } tracer, err := apm.NewTracerOptions(apm.TracerOptions{Transport: transport}) if err != nil { diff --git a/systemtest/approvals/TestDataStreamsEnabled/false.approved.json b/systemtest/approvals/TestDataStreamsEnabled/false.approved.json deleted file mode 100644 index 5b4a28ca9c8..00000000000 --- a/systemtest/approvals/TestDataStreamsEnabled/false.approved.json +++ /dev/null @@ -1,76 +0,0 @@ -{ - "events": [ - { - "@timestamp": "dynamic", - "agent": { - "name": "go", - "version": "0.0.0" - }, - "ecs": { - "version": "dynamic" - }, - "event": { - "ingested": "dynamic", - "outcome": "unknown" - }, - "host": { - "architecture": "i386", - "hostname": "beowulf", - "ip": "127.0.0.1", - "name": "beowulf", - "os": { - "platform": "minix" - } - }, - "observer": { - "ephemeral_id": "dynamic", - "hostname": "dynamic", - "id": "dynamic", - "type": "apm-server", - "version": "dynamic", - "version_major": "dynamic" - }, - "process": { - "pid": 1, - "title": "systemtest.test" - }, - "processor": { - "event": "transaction", - "name": "transaction" - }, - "service": { - "language": { - "name": "go", - "version": "2.0" - }, - "name": "systemtest", - "node": { - "name": "beowulf" - }, - "runtime": { - "name": "gc", - "version": "2.0" - } - }, - "timestamp": { - "us": "dynamic" - }, - "trace": { - "id": "dynamic" - }, - "transaction": { - "duration": { - "us": 1000000 - }, - "id": "dynamic", - "name": "name", - "sampled": true, - "span_count": { - "dropped": 0, - "started": 0 - }, - "type": "type" - } - } - ] -} diff --git a/systemtest/approvals/TestDataStreamsEnabled/true.approved.json b/systemtest/approvals/TestFleetIntegration.approved.json similarity index 94% rename from systemtest/approvals/TestDataStreamsEnabled/true.approved.json rename to systemtest/approvals/TestFleetIntegration.approved.json index 28ca86e8ed5..09739f5aa71 100644 --- a/systemtest/approvals/TestDataStreamsEnabled/true.approved.json +++ b/systemtest/approvals/TestFleetIntegration.approved.json @@ -13,12 +13,14 @@ "version": "dynamic" }, "event": { + "agent_id_status": "untrusted_user", + "ingested": "dynamic", "outcome": "unknown" }, "host": { "architecture": "i386", "hostname": "beowulf", - "ip": "127.0.0.1", + "ip": "10.11.12.13", "name": "beowulf", "os": { "platform": "minix" diff --git a/systemtest/datastreams_test.go b/systemtest/datastreams_test.go deleted file mode 100644 index 88aad971761..00000000000 --- a/systemtest/datastreams_test.go +++ /dev/null @@ -1,140 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package systemtest_test - -import ( - "encoding/json" - "fmt" - "io/ioutil" - "strings" - "testing" - "time" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "go.uber.org/zap/zapcore" - - "github.com/elastic/apm-server/systemtest" - "github.com/elastic/apm-server/systemtest/apmservertest" - "github.com/elastic/apm-server/systemtest/estest" -) - -func TestDataStreamsEnabled(t *testing.T) { - for _, enabled := range []bool{false, true} { - t.Run(fmt.Sprint(enabled), func(t *testing.T) { - systemtest.CleanupElasticsearch(t) - srv := apmservertest.NewUnstartedServer(t) - if enabled { - // Enable data streams. - srv.Config.DataStreams = &apmservertest.DataStreamsConfig{Enabled: true} - srv.Config.Setup = nil - - // Create a data stream index template. - resp, err := systemtest.Elasticsearch.Indices.PutIndexTemplate("apm-data-streams", strings.NewReader(fmt.Sprintf(`{ - "index_patterns": ["traces-apm*", "logs-apm*", "metrics-apm*"], - "data_stream": {}, - "priority": 200, - "template": {"settings": {"number_of_shards": 1, "refresh_interval": "250ms"}} - }`))) - require.NoError(t, err) - body, _ := ioutil.ReadAll(resp.Body) - require.False(t, resp.IsError(), string(body)) - - // Create an API Key which can write to traces-* etc. - // The default APM Server user can only write to apm-*. - // - // NOTE(axw) importantly, this API key lacks privileges - // to manage templates, pipelines, ILM, etc. Enabling - // data streams should disable all automatic setup. - resp, err = systemtest.Elasticsearch.Security.CreateAPIKey(strings.NewReader(fmt.Sprintf(`{ - "name": "%s", - "expiration": "1h", - "role_descriptors": { - "write-apm-data": { - "cluster": ["monitor"], - "index": [ - { - "names": ["traces-*", "metrics-*", "logs-*"], - "privileges": ["write", "create_index"] - } - ] - } - } - }`, t.Name()))) - require.NoError(t, err) - - var apiKeyResponse struct { - ID string - Name string - APIKey string `json:"api_key"` - } - require.NoError(t, json.NewDecoder(resp.Body).Decode(&apiKeyResponse)) - - // Use an API Key to mimic running under Fleet, with limited permissions. - srv.Config.Output.Elasticsearch.Username = "" - srv.Config.Output.Elasticsearch.Password = "" - srv.Config.Output.Elasticsearch.APIKey = fmt.Sprintf("%s:%s", apiKeyResponse.ID, apiKeyResponse.APIKey) - } - require.NoError(t, srv.Start()) - - tracer := srv.Tracer() - tx := tracer.StartTransaction("name", "type") - tx.Duration = time.Second - tx.End() - tracer.Flush(nil) - - result := systemtest.Elasticsearch.ExpectDocs(t, "apm-*,traces-apm*", estest.TermQuery{ - Field: "processor.event", Value: "transaction", - }) - systemtest.ApproveEvents( - t, t.Name(), result.Hits.Hits, - "@timestamp", "timestamp.us", - "trace.id", "transaction.id", - ) - - // Assert there are no unexpected warnings or errors. - for _, record := range srv.Logs.All() { - assert.Condition(t, func() bool { - if record.Level == zapcore.ErrorLevel { - return assert.Equal(t, "Started apm-server with data streams enabled but no active fleet management mode was specified", record.Message) - } - return record.Level < zapcore.WarnLevel - }, "%s: %s", record.Level, record.Message) - } - }) - } -} - -func TestDataStreamsSetupErrors(t *testing.T) { - cfg := apmservertest.DefaultConfig() - cfg.DataStreams = &apmservertest.DataStreamsConfig{Enabled: true} - cfgargs, err := cfg.Args() - require.NoError(t, err) - - test := func(args []string, expected string) { - args = append(args, cfgargs...) - cmd := apmservertest.ServerCommand("setup", args...) - out, err := cmd.CombinedOutput() - require.Error(t, err) - assert.Equal(t, "Exiting: "+expected+"\n", string(out)) - } - - test([]string{}, "index setup must be performed externally when using data streams, by installing the 'apm' integration package") - test([]string{"--index-management"}, "index setup must be performed externally when using data streams, by installing the 'apm' integration package") - test([]string{"--pipelines"}, "index pipeline setup must be performed externally when using data streams, by installing the 'apm' integration package") -} diff --git a/systemtest/fleet_test.go b/systemtest/fleet_test.go index 761bccaad3c..fa5a7afab7d 100644 --- a/systemtest/fleet_test.go +++ b/systemtest/fleet_test.go @@ -20,6 +20,7 @@ package systemtest_test import ( "context" "io/ioutil" + "net/http" "net/url" "testing" "time" @@ -31,6 +32,7 @@ import ( "go.elastic.co/apm/transport" "github.com/elastic/apm-server/systemtest" + "github.com/elastic/apm-server/systemtest/apmservertest" "github.com/elastic/apm-server/systemtest/fleettest" ) @@ -83,17 +85,36 @@ func TestFleetIntegration(t *testing.T) { require.NoError(t, err) // Elastic Agent has started apm-server. Connect to apm-server and send some data, - // and make sure it gets indexed into a data stream. - transport, err := transport.NewHTTPTransport() + // and make sure it gets indexed into a data stream. We override the transport to + // set known metadata. + httpTransport, err := transport.NewHTTPTransport() require.NoError(t, err) - transport.SetServerURL(&url.URL{Scheme: "http", Host: agent.Addrs["8200"]}) - tracer, err := apm.NewTracerOptions(apm.TracerOptions{Transport: transport}) + origTransport := httpTransport.Client.Transport + httpTransport.Client.Transport = roundTripperFunc(func(r *http.Request) (*http.Response, error) { + r.Header.Set("X-Real-Ip", "10.11.12.13") + return origTransport.RoundTrip(r) + }) + httpTransport.SetServerURL(&url.URL{Scheme: "http", Host: agent.Addrs["8200"]}) + tracer, err := apm.NewTracerOptions(apm.TracerOptions{ + Transport: apmservertest.NewFilteringTransport( + httpTransport, + apmservertest.DefaultMetadataFilter{}, + ), + }) require.NoError(t, err) defer tracer.Close() - tracer.StartTransaction("name", "type").End() + + tx := tracer.StartTransaction("name", "type") + tx.Duration = time.Second + tx.End() tracer.Flush(nil) - systemtest.Elasticsearch.ExpectDocs(t, "traces-*", nil) + result := systemtest.Elasticsearch.ExpectDocs(t, "traces-*", nil) + systemtest.ApproveEvents( + t, t.Name(), result.Hits.Hits, + "@timestamp", "timestamp.us", + "trace.id", "transaction.id", + ) } func TestFleetPackageNonMultiple(t *testing.T) { @@ -143,7 +164,16 @@ func initAPMIntegrationPackagePolicyInputs(t *testing.T, packagePolicy *fleettes } } -func getAPMIntegrationPackage(t *testing.T, fleet *fleettest.Client) *fleettest.Package { +func cleanupFleet(t testing.TB, fleet *fleettest.Client) { + cleanupFleetPolicies(t, fleet) + apmPackage := getAPMIntegrationPackage(t, fleet) + if apmPackage.Status == "installed" { + err := fleet.DeletePackage(apmPackage.Name, apmPackage.Version) + require.NoError(t, err) + } +} + +func getAPMIntegrationPackage(t testing.TB, fleet *fleettest.Client) *fleettest.Package { var apmPackage *fleettest.Package packages, err := fleet.ListPackages() require.NoError(t, err) @@ -161,7 +191,7 @@ func getAPMIntegrationPackage(t *testing.T, fleet *fleettest.Client) *fleettest. panic("unreachable") } -func cleanupFleet(t testing.TB, fleet *fleettest.Client) { +func cleanupFleetPolicies(t testing.TB, fleet *fleettest.Client) { apmAgentPolicies, err := fleet.AgentPolicies("ingest-agent-policies.name:apm_systemtest") require.NoError(t, err) if len(apmAgentPolicies) == 0 { @@ -187,3 +217,9 @@ func cleanupFleet(t testing.TB, fleet *fleettest.Client) { require.NoError(t, err) } } + +type roundTripperFunc func(*http.Request) (*http.Response, error) + +func (f roundTripperFunc) RoundTrip(r *http.Request) (*http.Response, error) { + return f(r) +} diff --git a/systemtest/fleettest/client.go b/systemtest/fleettest/client.go index 5034bd19f87..0f93351f2dd 100644 --- a/systemtest/fleettest/client.go +++ b/systemtest/fleettest/client.go @@ -239,6 +239,17 @@ func (c *Client) Package(name, version string) (*Package, error) { return &result.Response, nil } +// DeletePackage deletes (uninstalls) the package with the given name and version. +func (c *Client) DeletePackage(name, version string) error { + req := c.newFleetRequest("DELETE", "/epm/packages/"+name+"-"+version, nil) + resp, err := http.DefaultClient.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + return consumeResponse(resp, nil) +} + // PackagePolicy returns information about the package policy with the given ID. func (c *Client) PackagePolicy(id string) (*PackagePolicy, error) { resp, err := http.Get(c.fleetURL + "/package_policies/" + id)