diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index c2daabf1fc9..123da7ed75e 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -28,6 +28,7 @@ https://github.com/elastic/apm-server/compare/6.4\...master[View commits] - Require either `message` or `type` for `error.exception` {pull}1354[1354]. - Require `span.parent_id`, forbid `null` for `span.trace_id`, `transaction.trace_id` {pull}1391[1391] - Require `error.id` and changed description to 128 random bits ID {pull}1384[1384] +- Add rate limit handling per event {pull}1367[1367] [[release-notes-6.4]] == APM Server version 6.4 diff --git a/_meta/beat.yml b/_meta/beat.yml index a602ff6f656..b9b349852d8 100644 --- a/_meta/beat.yml +++ b/_meta/beat.yml @@ -42,9 +42,26 @@ apm-server: # To enable real user monitoring (RUM) support set this to true. #enabled: false + #-- v1 RUM endpoint + # Rate limit per second and IP address for requests sent to the RUM endpoint. #rate_limit: 10 + #-- v2 RUM endpoint + + #event_rate: + + # Defines the maximum amount of events allowed to be sent to the APM Server v2 RUM + # endpoint per ip per second. Defaults to 300. + #limit: 300 + + # An LRU cache is used to keep a rate limit per IP for the most recently seen IPs. + # This setting defines the number of unique IPs that can be tracked in the cache. + # Sites with many concurrent clients should consider increasing this limit. Defaults to 1000. + #lru_size: 1000 + + #-- General RUM settings + # Comma separated list of permitted origins for real user monitoring. # User-agents will send an origin header that will be validated against this list. # An origin is made of a protocol scheme, host and port, without the url path. @@ -90,6 +107,7 @@ apm-server: # is changed, a matching index pattern needs to be specified here. #index_pattern: "apm-*-sourcemap*" + # If set to true, APM Server augments data received by the agent with the original IP of the backend server, # or the IP and User Agent of the real user (RUM requests). It defaults to true. #capture_personal_data: true diff --git a/apm-server.yml b/apm-server.yml index 464d1a4fddb..dd2f414c619 100644 --- a/apm-server.yml +++ b/apm-server.yml @@ -42,9 +42,26 @@ apm-server: # To enable real user monitoring (RUM) support set this to true. #enabled: false + #-- v1 RUM endpoint + # Rate limit per second and IP address for requests sent to the RUM endpoint. #rate_limit: 10 + #-- v2 RUM endpoint + + #event_rate: + + # Defines the maximum amount of events allowed to be sent to the APM Server v2 RUM + # endpoint per ip per second. Defaults to 300. + #limit: 300 + + # An LRU cache is used to keep a rate limit per IP for the most recently seen IPs. + # This setting defines the number of unique IPs that can be tracked in the cache. + # Sites with many concurrent clients should consider increasing this limit. Defaults to 1000. + #lru_size: 1000 + + #-- General RUM settings + # Comma separated list of permitted origins for real user monitoring. # User-agents will send an origin header that will be validated against this list. # An origin is made of a protocol scheme, host and port, without the url path. @@ -90,6 +107,7 @@ apm-server: # is changed, a matching index pattern needs to be specified here. #index_pattern: "apm-*-sourcemap*" + # If set to true, APM Server augments data received by the agent with the original IP of the backend server, # or the IP and User Agent of the real user (RUM requests). It defaults to true. #capture_personal_data: true diff --git a/beater/approved-stream-result/TestRequestIntegrationRumRateLimit.approved.json b/beater/approved-stream-result/TestRequestIntegrationRumRateLimit.approved.json new file mode 100644 index 00000000000..748c625209c --- /dev/null +++ b/beater/approved-stream-result/TestRequestIntegrationRumRateLimit.approved.json @@ -0,0 +1,8 @@ +{ + "accepted": 30, + "errors": [ + { + "message": "rate limit exceeded" + } + ] +} diff --git a/beater/beater.go b/beater/beater.go index 45a0681f8c5..637f0507bf0 100644 --- a/beater/beater.go +++ b/beater/beater.go @@ -18,16 +18,13 @@ package beater import ( - "fmt" "net" "net/http" "net/url" "os" - "regexp" "sync" "time" - "github.com/pkg/errors" "golang.org/x/sync/errgroup" "github.com/elastic/apm-agent-go" @@ -52,18 +49,11 @@ type beater struct { // Creates beater func New(b *beat.Beat, ucfg *common.Config) (beat.Beater, error) { logger := logp.NewLogger("beater") - beaterConfig := defaultConfig(b.Info.Version) - if err := ucfg.Unpack(beaterConfig); err != nil { - return nil, errors.Wrap(err, "Error processing configuration") + beaterConfig, err := NewConfig(b.Info.Version, ucfg) + if err != nil { + return nil, err } - beaterConfig.SetRumConfig() if beaterConfig.RumConfig.isEnabled() { - if _, err := regexp.Compile(beaterConfig.RumConfig.LibraryPattern); err != nil { - return nil, errors.New(fmt.Sprintf("Invalid regex for `library_pattern`: %v", err.Error())) - } - if _, err := regexp.Compile(beaterConfig.RumConfig.ExcludeFromGrouping); err != nil { - return nil, errors.New(fmt.Sprintf("Invalid regex for `exclude_from_grouping`: %v", err.Error())) - } if b.Config != nil && beaterConfig.RumConfig.SourceMapping.EsConfig == nil { // fall back to elasticsearch output configuration for sourcemap storage if possible if isElasticsearchOutput(b) { diff --git a/beater/beater_test.go b/beater/beater_test.go index 559b20a84d0..d17ade3173f 100644 --- a/beater/beater_test.go +++ b/beater/beater_test.go @@ -86,8 +86,12 @@ func TestBeatConfig(t *testing.T) { "url": "/debug/vars", }, "frontend": map[string]interface{}{ - "enabled": true, - "rate_limit": 1000, + "enabled": true, + "rate_limit": 1000, + "event_rate": map[string]interface{}{ + "limit": 7200, + "lru_size": 2000, + }, "allow_origins": []string{"example*"}, "source_mapping": map[string]interface{}{ "cache": map[string]interface{}{ @@ -128,8 +132,12 @@ func TestBeatConfig(t *testing.T) { Url: "/debug/vars", }, FrontendConfig: &rumConfig{ - Enabled: &truthy, - RateLimit: 1000, + Enabled: &truthy, + RateLimit: 1000, + EventRate: &eventRate{ + Limit: 7200, + LruSize: 2000, + }, AllowOrigins: []string{"example*"}, SourceMapping: &SourceMapping{ Cache: &Cache{Expiration: 8 * time.Minute}, @@ -140,8 +148,12 @@ func TestBeatConfig(t *testing.T) { beatVersion: "6.2.0", }, RumConfig: &rumConfig{ - Enabled: &truthy, - RateLimit: 1000, + Enabled: &truthy, + RateLimit: 1000, + EventRate: &eventRate{ + Limit: 7200, + LruSize: 2000, + }, AllowOrigins: []string{"example*"}, SourceMapping: &SourceMapping{ Cache: &Cache{Expiration: 8 * time.Minute}, @@ -182,6 +194,9 @@ func TestBeatConfig(t *testing.T) { "frontend": map[string]interface{}{ "enabled": true, "rate_limit": 890, + "event_rate": map[string]interface{}{ + "lru_size": 200, + }, "source_mapping": map[string]interface{}{ "cache": map[string]interface{}{ "expiration": 4, @@ -224,6 +239,10 @@ func TestBeatConfig(t *testing.T) { FrontendConfig: &rumConfig{ Enabled: &truthy, RateLimit: 890, + EventRate: &eventRate{ + Limit: 300, + LruSize: 200, + }, SourceMapping: &SourceMapping{ Cache: &Cache{ Expiration: 4 * time.Second, @@ -236,8 +255,12 @@ func TestBeatConfig(t *testing.T) { beatVersion: "6.2.0", }, RumConfig: &rumConfig{ - Enabled: &truthy, - RateLimit: 10, + Enabled: &truthy, + RateLimit: 10, + EventRate: &eventRate{ + Limit: 300, + LruSize: 1000, + }, AllowOrigins: []string{"*"}, SourceMapping: &SourceMapping{ Cache: &Cache{ diff --git a/beater/common_handlers.go b/beater/common_handlers.go index 7b5fec74657..ffac551a010 100644 --- a/beater/common_handlers.go +++ b/beater/common_handlers.go @@ -27,12 +27,6 @@ import ( "strings" "time" - "github.com/gofrs/uuid" - "github.com/hashicorp/golang-lru" - "github.com/pkg/errors" - "github.com/ryanuber/go-glob" - "golang.org/x/time/rate" - "github.com/elastic/apm-server/decoder" "github.com/elastic/apm-server/processor" "github.com/elastic/apm-server/publish" @@ -42,12 +36,14 @@ import ( "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/monitoring" "github.com/elastic/beats/libbeat/version" + "github.com/gofrs/uuid" + lru "github.com/hashicorp/golang-lru" + "github.com/pkg/errors" + "github.com/ryanuber/go-glob" + "golang.org/x/time/rate" ) const ( - rateLimitCacheSize = 1000 - rateLimitBurstMultiplier = 2 - supportedHeaders = "Content-Type, Content-Encoding, Accept" supportedMethods = "POST, OPTIONS" ) @@ -126,10 +122,11 @@ var ( counter: validateCounter, } } + rateLimitCounter = counter("response.errors.ratelimit") rateLimitedResponse = serverResponse{ err: errors.New("too many requests"), code: http.StatusTooManyRequests, - counter: counter("response.errors.ratelimit"), + counter: rateLimitCounter, } methodNotAllowedCounter = counter("response.errors.method") methodNotAllowedResponse = serverResponse{ @@ -172,7 +169,7 @@ func newMuxer(beaterConfig *Config, report publish.Reporter) *http.ServeMux { for path, route := range V2Routes { logger.Infof("Path %s added to request handler", path) - mux.Handle(path, route.Handler(beaterConfig, report)) + mux.Handle(path, route.Handler(path, beaterConfig, report)) } mux.Handle(rootURL, rootHandler(beaterConfig.SecretToken)) @@ -244,9 +241,11 @@ func rootHandler(secretToken string) http.Handler { return logHandler(handler) } -type contextKey string +type reqLoggerKey struct{} -const reqLoggerContextKey = contextKey("requestLogger") +func ContextWithReqLogger(ctx context.Context, rl *logp.Logger) context.Context { + return context.WithValue(ctx, reqLoggerKey{}, rl) +} func logHandler(h http.Handler) http.Handler { logger := logp.NewLogger("request") @@ -266,13 +265,8 @@ func logHandler(h http.Handler) http.Handler { "remote_address", utility.RemoteAddr(r), "user-agent", r.Header.Get("User-Agent")) - lr := r.WithContext( - context.WithValue(r.Context(), reqLoggerContextKey, reqLogger), - ) - lw := utility.NewRecordingResponseWriter(w) - - h.ServeHTTP(lw, lr) + h.ServeHTTP(lw, r.WithContext(ContextWithReqLogger(r.Context(), reqLogger))) if lw.Code <= 399 { reqLogger.Infow("handled request", []interface{}{"response_code", lw.Code}...) @@ -290,7 +284,7 @@ func requestTimeHandler(h http.Handler) http.Handler { // requestLogger is a convenience function to retrieve the logger that was // added to the request context by handler `logHandler`` func requestLogger(r *http.Request) *logp.Logger { - logger, ok := r.Context().Value(reqLoggerContextKey).(*logp.Logger) + logger, ok := r.Context().Value(reqLoggerKey{}).(*logp.Logger) if !ok { logger = logp.NewLogger("request") } @@ -307,6 +301,11 @@ func killSwitchHandler(killSwitch bool, h http.Handler) http.Handler { }) } +const ( + rateLimitCacheSize = 1000 + rateLimitBurstMultiplier = 2 +) + func ipRateLimitHandler(rateLimit int, h http.Handler) http.Handler { cache, _ := lru.New(rateLimitCacheSize) diff --git a/beater/config.go b/beater/config.go index dddc9a1a912..34688af136b 100644 --- a/beater/config.go +++ b/beater/config.go @@ -30,6 +30,7 @@ import ( "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/outputs" "github.com/elastic/go-ucfg" + "github.com/pkg/errors" ) const defaultPort = "8200" @@ -64,6 +65,7 @@ type ExpvarConfig struct { type rumConfig struct { Enabled *bool `config:"enabled"` RateLimit int `config:"rate_limit"` + EventRate *eventRate `config:"event_rate"` AllowOrigins []string `config:"allow_origins"` LibraryPattern string `config:"library_pattern"` ExcludeFromGrouping string `config:"exclude_from_grouping"` @@ -72,6 +74,11 @@ type rumConfig struct { beatVersion string } +type eventRate struct { + Limit int `config:"limit"` + LruSize int `config:"lru_size"` +} + type metricsConfig struct { Enabled *bool `config:"enabled"` } @@ -136,6 +143,24 @@ type InstrumentationConfig struct { SecretToken string `config:"secret_token"` } +func NewConfig(version string, ucfg *common.Config) (*Config, error) { + c := defaultConfig(version) + if err := ucfg.Unpack(c); err != nil { + return nil, errors.Wrap(err, "Error processing configuration") + } + + c.setRumConfig() + if c.RumConfig.isEnabled() { + if _, err := regexp.Compile(c.RumConfig.LibraryPattern); err != nil { + return nil, errors.New(fmt.Sprintf("Invalid regex for `library_pattern`: %v", err.Error())) + } + if _, err := regexp.Compile(c.RumConfig.ExcludeFromGrouping); err != nil { + return nil, errors.New(fmt.Sprintf("Invalid regex for `exclude_from_grouping`: %v", err.Error())) + } + } + return c, nil +} + func (c *Config) setSmapElasticsearch(esConfig *common.Config) { if c != nil && c.RumConfig.isEnabled() && c.RumConfig.SourceMapping != nil { c.RumConfig.SourceMapping.EsConfig = esConfig @@ -170,7 +195,7 @@ func (c *pipelineConfig) shouldOverwrite() bool { return c != nil && (c.Overwrite != nil && *c.Overwrite) } -func (c *Config) SetRumConfig() { +func (c *Config) setRumConfig() { if c.RumConfig != nil && c.RumConfig.Enabled != nil { return } @@ -210,7 +235,11 @@ func replaceVersion(pattern, version string) string { func defaultRum(beatVersion string) *rumConfig { return &rumConfig{ - RateLimit: 10, + RateLimit: 10, + EventRate: &eventRate{ + Limit: 300, + LruSize: 1000, + }, AllowOrigins: []string{"*"}, SourceMapping: &SourceMapping{ Cache: &Cache{ diff --git a/beater/config_test.go b/beater/config_test.go index 3775eb7ad6a..816ef696197 100644 --- a/beater/config_test.go +++ b/beater/config_test.go @@ -54,6 +54,10 @@ func TestConfig(t *testing.T) { "rum": { "enabled": true, "rate_limit": 800, + "event_rate": { + "limit": 8000, + "lru_size": 2000, + }, "allow_origins": ["rum*"], "source_mapping": { "cache": { @@ -67,6 +71,10 @@ func TestConfig(t *testing.T) { "frontend": { "enabled": true, "rate_limit": 1000, + "event_rate": { + "limit": 1000, + "lru_size": 500, + }, "allow_origins": ["example*"], "source_mapping": { "cache": { @@ -97,8 +105,12 @@ func TestConfig(t *testing.T) { SecretToken: "1234random", SSL: &SSLConfig{Enabled: &truthy, Certificate: outputs.CertificateConfig{Certificate: "1234cert", Key: "1234key"}}, RumConfig: &rumConfig{ - Enabled: &truthy, - RateLimit: 800, + Enabled: &truthy, + RateLimit: 800, + EventRate: &eventRate{ + Limit: 8000, + LruSize: 2000, + }, AllowOrigins: []string{"rum*"}, SourceMapping: &SourceMapping{ Cache: &Cache{Expiration: 1 * time.Minute}, @@ -108,8 +120,12 @@ func TestConfig(t *testing.T) { ExcludeFromGrouping: "group_pattern-rum", }, FrontendConfig: &rumConfig{ - Enabled: &truthy, - RateLimit: 1000, + Enabled: &truthy, + RateLimit: 1000, + EventRate: &eventRate{ + Limit: 1000, + LruSize: 500, + }, AllowOrigins: []string{"example*"}, SourceMapping: &SourceMapping{ Cache: &Cache{Expiration: 10 * time.Minute}, @@ -172,6 +188,7 @@ func TestConfig(t *testing.T) { RumConfig: &rumConfig{ Enabled: nil, RateLimit: 0, + EventRate: nil, AllowOrigins: nil, SourceMapping: &SourceMapping{ IndexPattern: "", @@ -265,7 +282,7 @@ func TestIsRumEnabled(t *testing.T) { {c: &Config{RumConfig: &rumConfig{Enabled: &truthy}}, enabled: true}, {c: &Config{RumConfig: &rumConfig{Enabled: new(bool)}, FrontendConfig: &rumConfig{Enabled: &truthy}}, enabled: false}, } { - td.c.SetRumConfig() + td.c.setRumConfig() assert.Equal(t, td.enabled, td.c.RumConfig.isEnabled()) } @@ -301,7 +318,7 @@ func TestSetRum(t *testing.T) { {c: &Config{RumConfig: testRumConf, FrontendConfig: testFrontendConf}, rc: testRumConf}, } for _, test := range cases { - test.c.SetRumConfig() + test.c.setRumConfig() assert.Equal(t, test.rc, test.c.RumConfig) } } diff --git a/beater/rl_cache.go b/beater/rl_cache.go new file mode 100644 index 00000000000..4fd55a527e0 --- /dev/null +++ b/beater/rl_cache.go @@ -0,0 +1,88 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package beater + +import ( + "sync" + + "github.com/hashicorp/golang-lru/simplelru" + "github.com/pkg/errors" + "golang.org/x/time/rate" +) + +const burstMultiplier = 3 + +// The rlCache is a simple lru cache holding N=size rate limiter entities. Every +// rate limiter entity allows N=rateLimit hits per key (=IP) per second, and has a +// burst queue of limit*5. +// As the used lru cache is of a fixed size, cache entries can get evicted, in +// which case the evicted limiter is reused as the new rate limiter for the +// current key. This adds a certain random factor to the rate limiting in case the +// cache is full. The purpose is to avoid bypassing the rate limiting by sending +// requests from cache_size*2 unique keys, which would lead to evicted keys and +// the creation of new rate limiter entities with full allowance. +type rlCache struct { + cache *simplelru.LRU + limit int + + mu sync.Mutex //guards limiter in cache + evictedLimiter *rate.Limiter +} + +func NewRlCache(size, rateLimit int) (*rlCache, error) { + if size <= 0 || rateLimit < 0 { + return nil, errors.New("cache initialization: size and rateLimit must be greater than zero") + } + + rlc := rlCache{limit: rateLimit} + + var onEvicted = func(_ interface{}, value interface{}) { + rlc.evictedLimiter = *value.(**rate.Limiter) + } + + c, err := simplelru.NewLRU(size, simplelru.EvictCallback(onEvicted)) + if err != nil { + return nil, err + } + rlc.cache = c + return &rlc, nil +} + +func (rlc *rlCache) getRateLimiter(key string) *rate.Limiter { + // fetch the rate limiter from the cache, if a cache is given + if rlc.cache == nil || rlc.limit == -1 { + return nil + } + + // lock get and add action for cache to allow proper eviction handling without + // race conditions. + rlc.mu.Lock() + defer rlc.mu.Unlock() + + if l, ok := rlc.cache.Get(key); ok { + return *l.(**rate.Limiter) + } + + var limiter *rate.Limiter + if evicted := rlc.cache.Add(key, &limiter); evicted { + limiter = rlc.evictedLimiter + } else { + limiter = rate.NewLimiter(rate.Limit(rlc.limit), rlc.limit*burstMultiplier) + } + return limiter +} diff --git a/beater/rl_cache_test.go b/beater/rl_cache_test.go new file mode 100644 index 00000000000..886c7e95ae5 --- /dev/null +++ b/beater/rl_cache_test.go @@ -0,0 +1,73 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package beater + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestCacheInitFails(t *testing.T) { + for _, test := range []struct { + size int + limit int + }{ + {-1, 1}, + {0, 1}, + {1, -1}, + } { + c, err := NewRlCache(test.size, test.limit) + assert.Error(t, err) + assert.Nil(t, c) + } +} + +func TestCacheEviction(t *testing.T) { + cache_size := 2 + limit := 1 //multiplied times burstMultiplier 5 + + rlc, err := NewRlCache(cache_size, limit) + require.NoError(t, err) + + // add new limiter + rl_a := rlc.getRateLimiter("a") + rl_a.AllowN(time.Now(), 3) + + // add new limiter + rl_b := rlc.getRateLimiter("b") + rl_b.AllowN(time.Now(), 2) + + // reuse evicted limiter rl_a + rl_c := rlc.getRateLimiter("c") + assert.False(t, rl_c.Allow()) + assert.Equal(t, rl_c, rlc.evictedLimiter) + + // reuse evicted limiter rl_b + rl_d := rlc.getRateLimiter("a") + assert.True(t, rl_d.Allow()) + assert.False(t, rl_d.Allow()) + assert.Equal(t, rl_d, rlc.evictedLimiter) + // check that limiter are independent + assert.True(t, rl_d != rl_c) + rlc.evictedLimiter = nil + assert.NotNil(t, rl_d) + assert.NotNil(t, rl_c) +} diff --git a/beater/route_config.go b/beater/route_config.go index 6682e83deed..bdad837dce3 100644 --- a/beater/route_config.go +++ b/beater/route_config.go @@ -21,12 +21,11 @@ import ( "net/http" "regexp" - "github.com/elastic/apm-server/processor/stream" - "github.com/elastic/apm-server/processor" perr "github.com/elastic/apm-server/processor/error" "github.com/elastic/apm-server/processor/metric" "github.com/elastic/apm-server/processor/sourcemap" + "github.com/elastic/apm-server/processor/stream" "github.com/elastic/apm-server/processor/transaction" "github.com/elastic/apm-server/publish" @@ -79,16 +78,25 @@ var V1Routes = map[string]v1Route{ var V2Routes = map[string]v2Route{ V2BackendURL: v2BackendRoute, - V2RumURL: {rumRouteType}, + V2RumURL: v2RumRoute, } -var v2BackendRoute = v2Route{ - routeType{ - v2backendHandler, - systemMetadataDecoder, - func(*Config) transform.Config { return transform.Config{} }, - }, -} +var ( + v2BackendRoute = v2Route{ + routeType{ + v2backendHandler, + systemMetadataDecoder, + func(*Config) transform.Config { return transform.Config{} }, + }, + } + v2RumRoute = v2Route{ + routeType{ + v2rumHandler, + userMetaDataDecoder, + rumTransformConfig, + }, + } +) var ( backendRouteType = routeType{ @@ -127,6 +135,12 @@ func v2backendHandler(beaterConfig *Config, h http.Handler) http.Handler { authHandler(beaterConfig.SecretToken, h))) } +func v2rumHandler(beaterConfig *Config, h http.Handler) http.Handler { + return killSwitchHandler(beaterConfig.RumConfig.isEnabled(), + requestTimeHandler( + corsHandler(beaterConfig.RumConfig.AllowOrigins, h))) +} + func backendHandler(beaterConfig *Config, h http.Handler) http.Handler { return logHandler( requestTimeHandler( @@ -198,16 +212,24 @@ type v2Route struct { routeType } -func (v v2Route) Handler(beaterConfig *Config, report publish.Reporter) http.Handler { +func (v v2Route) Handler(url string, c *Config, report publish.Reporter) http.Handler { reqDecoder := v.configurableDecoder( - beaterConfig, + c, func(*http.Request) (map[string]interface{}, error) { return map[string]interface{}{}, nil }, ) v2Handler := v2Handler{ requestDecoder: reqDecoder, - streamProcessor: &stream.StreamProcessor{Tconfig: v.transformConfig(beaterConfig)}, + streamProcessor: &stream.StreamProcessor{Tconfig: v.transformConfig(c)}, + } + + if url == V2RumURL { + if rlc, err := NewRlCache(c.RumConfig.EventRate.LruSize, c.RumConfig.EventRate.Limit); err == nil { + v2Handler.rlc = rlc + } else { + logp.NewLogger("handler").Error(err.Error()) + } } - return v.wrappingHandler(beaterConfig, v2Handler.Handle(beaterConfig, report)) + return v.wrappingHandler(c, v2Handler.Handle(c, report)) } diff --git a/beater/v2_handler.go b/beater/v2_handler.go index b31683c4441..765a8b16933 100644 --- a/beater/v2_handler.go +++ b/beater/v2_handler.go @@ -25,6 +25,7 @@ import ( "github.com/elastic/apm-server/processor/stream" "github.com/elastic/apm-server/publish" + "github.com/elastic/apm-server/utility" "github.com/elastic/beats/libbeat/logp" @@ -34,6 +35,7 @@ import ( type v2Handler struct { requestDecoder decoder.ReqDecoder streamProcessor *stream.StreamProcessor + rlc *rlCache } func (v *v2Handler) statusCode(sr *stream.Result) (int, *monitoring.Int) { @@ -58,6 +60,9 @@ func (v *v2Handler) statusCode(sr *stream.Result) (int, *monitoring.Int) { case stream.ShuttingDownErrType: code = http.StatusServiceUnavailable ct = serverShuttingDownCounter + case stream.RateLimitErrType: + code = http.StatusTooManyRequests + ct = rateLimitCounter default: code = http.StatusInternalServerError ct = responseErrorsOthers @@ -111,12 +116,27 @@ func (v *v2Handler) Handle(beaterConfig *Config, report publish.Reporter) http.H return } + ctx := r.Context() + if v.rlc != nil { + rl := v.rlc.getRateLimiter(utility.RemoteAddr(r)) + if !rl.Allow() { + sr := stream.Result{} + sr.Add(&stream.Error{ + Type: stream.RateLimitErrType, + Message: "rate limit exceeded", + }) + v.sendResponse(logger, w, &sr) + return + } + ctx = stream.ContextWithRateLimiter(ctx, rl) + } + ndReader, err := decoder.NDJSONStreamDecodeCompressedWithLimit(r, beaterConfig.MaxEventSize) if err != nil { // if we can't set up the ndjsonreader, // we won't be able to make sense of the body sr := stream.Result{} - sr.LimitedAdd(&stream.Error{ + sr.Add(&stream.Error{ Type: stream.InvalidInputErrType, Message: err.Error(), }) @@ -127,12 +147,11 @@ func (v *v2Handler) Handle(beaterConfig *Config, report publish.Reporter) http.H reqMeta, err := v.requestDecoder(r) if err != nil { sr := stream.Result{} - sr.LimitedAdd(err) + sr.Add(err) v.sendResponse(logger, w, &sr) return } - - res := v.streamProcessor.HandleStream(r.Context(), reqMeta, ndReader, report) + res := v.streamProcessor.HandleStream(ctx, reqMeta, ndReader, report) v.sendResponse(logger, w, res) }) diff --git a/beater/v2_handler_test.go b/beater/v2_handler_test.go index 96faa1ebece..b3e09908ab1 100644 --- a/beater/v2_handler_test.go +++ b/beater/v2_handler_test.go @@ -25,10 +25,10 @@ import ( "path/filepath" "testing" - "github.com/elastic/beats/libbeat/monitoring" - "github.com/elastic/apm-server/publish" "github.com/elastic/apm-server/tests" + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/monitoring" "github.com/pkg/errors" "github.com/stretchr/testify/assert" @@ -44,7 +44,7 @@ func TestInvalidContentType(t *testing.T) { w := httptest.NewRecorder() c := defaultConfig("7.0.0") - handler := (&v2BackendRoute).Handler(c, nil) + handler := (&v2BackendRoute).Handler("", c, nil) handler.ServeHTTP(w, req) @@ -58,7 +58,7 @@ func TestEmptyRequest(t *testing.T) { w := httptest.NewRecorder() c := defaultConfig("7.0.0") - handler := (&v2BackendRoute).Handler(c, nil) + handler := (&v2BackendRoute).Handler("", c, nil) handler.ServeHTTP(w, req) @@ -84,7 +84,7 @@ func TestRequestDecoderError(t *testing.T) { }, } - handler := testRouteWithFaultyDecoder.Handler(c, nil) + handler := testRouteWithFaultyDecoder.Handler("", c, nil) handler.ServeHTTP(w, req) @@ -110,25 +110,16 @@ func TestRequestIntegration(t *testing.T) { {name: "FullQueue", code: http.StatusServiceUnavailable, path: "errors.ndjson", reportingErr: publish.ErrFull, counter: fullQueueCounter}, } { t.Run(test.name, func(t *testing.T) { - b, err := loader.LoadDataAsBytes(filepath.Join("../testdata/intake-v2/", test.path)) - require.NoError(t, err) - bodyReader := bytes.NewBuffer(b) - - req := httptest.NewRequest("POST", V2BackendURL, bodyReader) - req.Header.Add("Content-Type", "application/x-ndjson") - - w := httptest.NewRecorder() - - c := defaultConfig("7.0.0") - report := func(context.Context, publish.PendingReq) error { - return test.reportingErr - } - handler := (&v2BackendRoute).Handler(c, report) - ctSuccess := responseSuccesses.Get() ctFailure := responseErrors.Get() ct := test.counter.Get() - handler.ServeHTTP(w, req) + + w, err := sendReq(defaultConfig("7.0.0"), + &v2BackendRoute, + V2BackendURL, + filepath.Join("../testdata/intake-v2/", test.path), + test.reportingErr) + require.NoError(t, err) assert.Equal(t, test.code, w.Code, w.Body.String()) assert.Equal(t, ct+1, test.counter.Get()) @@ -138,7 +129,7 @@ func TestRequestIntegration(t *testing.T) { assert.Equal(t, ctSuccess+1, responseSuccesses.Get()) assert.Equal(t, ctFailure, responseErrors.Get()) } else { - assert.Equal(t, w.HeaderMap.Get("Content-Type"), "application/json") + assert.Equal(t, "application/json", w.HeaderMap.Get("Content-Type")) assert.Equal(t, ctSuccess, responseSuccesses.Get()) assert.Equal(t, ctFailure+1, responseErrors.Get()) @@ -149,11 +140,56 @@ func TestRequestIntegration(t *testing.T) { } } +func TestRequestIntegrationRUM(t *testing.T) { + for _, test := range []struct { + name string + code int + path string + }{ + {name: "Success", code: http.StatusAccepted, path: "../testdata/intake-v2/errors.ndjson"}, + {name: "RateLimit", code: http.StatusTooManyRequests, path: "../testdata/intake-v2/heavy.ndjson"}, + } { + t.Run(test.name, func(t *testing.T) { + + ucfg, err := common.NewConfigFrom(m{"rum": m{"enabled": true, "event_rate": m{"limit": 9}}}) + require.NoError(t, err) + c, err := NewConfig("7.0.0", ucfg) + require.NoError(t, err) + w, err := sendReq(c, &v2RumRoute, V2RumURL, test.path, nil) + require.NoError(t, err) + + require.Equal(t, test.code, w.Code, w.Body.String()) + if test.code != http.StatusAccepted { + body := w.Body.Bytes() + tests.AssertApproveResult(t, "approved-stream-result/TestRequestIntegrationRum"+test.name, body) + } + }) + } +} + +func sendReq(c *Config, route *v2Route, url string, p string, repErr error) (*httptest.ResponseRecorder, error) { + b, err := loader.LoadDataAsBytes(p) + if err != nil { + return nil, err + } + req := httptest.NewRequest("POST", url, bytes.NewBuffer(b)) + req.Header.Add("Content-Type", "application/x-ndjson") + + report := func(context.Context, publish.PendingReq) error { + return repErr + } + handler := route.Handler(url, c, report) + + w := httptest.NewRecorder() + handler.ServeHTTP(w, req) + return w, nil +} + func TestV2WrongMethod(t *testing.T) { req := httptest.NewRequest("GET", "/intake/v2/events", nil) req.Header.Add("Content-Type", "application/x-ndjson") w := httptest.NewRecorder() - handler := (&v2BackendRoute).Handler(defaultConfig("7.0.0"), nil) + handler := (&v2BackendRoute).Handler("", defaultConfig("7.0.0"), nil) ct := methodNotAllowedCounter.Get() handler.ServeHTTP(w, req) @@ -186,7 +222,7 @@ func TestV2LineExceeded(t *testing.T) { } c := defaultConfig("7.0.0") assert.False(t, lineLimitExceededInTestData(c.MaxEventSize)) - handler := (&v2BackendRoute).Handler(c, report) + handler := (&v2BackendRoute).Handler("", c, report) handler.ServeHTTP(w, req) assert.Equal(t, http.StatusAccepted, w.Code, w.Body.String()) @@ -194,7 +230,7 @@ func TestV2LineExceeded(t *testing.T) { c.MaxEventSize = 20 assert.True(t, lineLimitExceededInTestData(c.MaxEventSize)) - handler = (&v2BackendRoute).Handler(c, report) + handler = (&v2BackendRoute).Handler("", c, report) req = httptest.NewRequest("POST", "/v2/intake", bytes.NewBuffer(b)) req.Header.Add("Content-Type", "application/x-ndjson") diff --git a/processor/sourcemap/package_tests/attrs_test.go b/processor/sourcemap/package_tests/attrs_test.go index a0bd2f479f1..51d70da9c20 100644 --- a/processor/sourcemap/package_tests/attrs_test.go +++ b/processor/sourcemap/package_tests/attrs_test.go @@ -56,6 +56,7 @@ func TestKeywordLimitationOnSourcemapAttributes(t *testing.T) { "sourcemap.service.version": "service_version", "sourcemap.bundle_filepath": "bundle_filepath", } + procSetup.KeywordLimitation(t, tests.NewSet(), mapping) } diff --git a/processor/stream/approved-es-documents/testV2IntakeIntegrationMixedMinimalProcess.approved.json b/processor/stream/approved-es-documents/testV2IntakeIntegrationEvents.approved.json similarity index 100% rename from processor/stream/approved-es-documents/testV2IntakeIntegrationMixedMinimalProcess.approved.json rename to processor/stream/approved-es-documents/testV2IntakeIntegrationEvents.approved.json diff --git a/processor/stream/approved-stream-result/testIntegrationResultMixedMinimalProcess.approved.json b/processor/stream/approved-stream-result/testIntegrationResultEvents.approved.json similarity index 100% rename from processor/stream/approved-stream-result/testIntegrationResultMixedMinimalProcess.approved.json rename to processor/stream/approved-stream-result/testIntegrationResultEvents.approved.json diff --git a/processor/stream/approved-stream-result/testIntegrationResultLimiterAllowAll.approved.json b/processor/stream/approved-stream-result/testIntegrationResultLimiterAllowAll.approved.json new file mode 100644 index 00000000000..c612a4faf20 --- /dev/null +++ b/processor/stream/approved-stream-result/testIntegrationResultLimiterAllowAll.approved.json @@ -0,0 +1,3 @@ +{ + "accepted": 19 +} diff --git a/processor/stream/approved-stream-result/testIntegrationResultLimiterDeny.approved.json b/processor/stream/approved-stream-result/testIntegrationResultLimiterDeny.approved.json new file mode 100644 index 00000000000..8333c5a198a --- /dev/null +++ b/processor/stream/approved-stream-result/testIntegrationResultLimiterDeny.approved.json @@ -0,0 +1,8 @@ +{ + "accepted": 10, + "errors": [ + { + "message": "rate limit exceeded" + } + ] +} diff --git a/processor/stream/approved-stream-result/testIntegrationResultLimiterDenyAll.approved.json b/processor/stream/approved-stream-result/testIntegrationResultLimiterDenyAll.approved.json new file mode 100644 index 00000000000..507b15773aa --- /dev/null +++ b/processor/stream/approved-stream-result/testIntegrationResultLimiterDenyAll.approved.json @@ -0,0 +1,8 @@ +{ + "accepted": 0, + "errors": [ + { + "message": "rate limit exceeded" + } + ] +} diff --git a/processor/stream/approved-stream-result/testIntegrationResultLimiterPartiallyUsedLimitAllow.approved.json b/processor/stream/approved-stream-result/testIntegrationResultLimiterPartiallyUsedLimitAllow.approved.json new file mode 100644 index 00000000000..c612a4faf20 --- /dev/null +++ b/processor/stream/approved-stream-result/testIntegrationResultLimiterPartiallyUsedLimitAllow.approved.json @@ -0,0 +1,3 @@ +{ + "accepted": 19 +} diff --git a/processor/stream/approved-stream-result/testIntegrationResultLimiterPartiallyUsedLimitDeny.approved.json b/processor/stream/approved-stream-result/testIntegrationResultLimiterPartiallyUsedLimitDeny.approved.json new file mode 100644 index 00000000000..8333c5a198a --- /dev/null +++ b/processor/stream/approved-stream-result/testIntegrationResultLimiterPartiallyUsedLimitDeny.approved.json @@ -0,0 +1,8 @@ +{ + "accepted": 10, + "errors": [ + { + "message": "rate limit exceeded" + } + ] +} diff --git a/processor/stream/approved-stream-result/testIntegrationResultNoLimiter.approved.json b/processor/stream/approved-stream-result/testIntegrationResultNoLimiter.approved.json new file mode 100644 index 00000000000..c612a4faf20 --- /dev/null +++ b/processor/stream/approved-stream-result/testIntegrationResultNoLimiter.approved.json @@ -0,0 +1,3 @@ +{ + "accepted": 19 +} diff --git a/processor/stream/benchmark_test.go b/processor/stream/benchmark_test.go index 56b669a9974..d2a3a4f2b5b 100644 --- a/processor/stream/benchmark_test.go +++ b/processor/stream/benchmark_test.go @@ -22,10 +22,13 @@ import ( "context" "errors" "io/ioutil" + "math" "path/filepath" "runtime" "testing" + r "golang.org/x/time/rate" + "github.com/elastic/apm-server/decoder" "github.com/elastic/apm-server/publish" "github.com/elastic/apm-server/tests/loader" @@ -44,7 +47,8 @@ func BenchmarkStreamProcessor(b *testing.B) { if err != nil { b.Error(err) } - ctx := context.Background() + //ensure to not hit rate limit as blocking wait would be measured otherwise + ctx := ContextWithRateLimiter(context.Background(), r.NewLimiter(r.Limit(math.MaxFloat64-1), math.MaxInt32)) sp := &StreamProcessor{} for _, f := range files { b.Run(f.Name(), func(b *testing.B) { diff --git a/processor/stream/result.go b/processor/stream/result.go index e6abc34c3bf..7245ad63371 100644 --- a/processor/stream/result.go +++ b/processor/stream/result.go @@ -46,6 +46,7 @@ const ( ShuttingDownErrType ServerErrType MethodForbiddenErrType + RateLimitErrType ) const ( diff --git a/processor/stream/stream_processor.go b/processor/stream/stream_processor.go index ec795f2c13c..de02567f507 100644 --- a/processor/stream/stream_processor.go +++ b/processor/stream/stream_processor.go @@ -21,9 +21,12 @@ import ( "context" "errors" "io" + "time" "github.com/santhosh-tekuri/jsonschema" + "golang.org/x/time/rate" + "github.com/elastic/apm-server/decoder" er "github.com/elastic/apm-server/model/error" "github.com/elastic/apm-server/model/metadata" @@ -36,6 +39,19 @@ import ( "github.com/elastic/apm-server/validation" ) +type rateLimiterKey struct{} + +func ContextWithRateLimiter(ctx context.Context, limiter *rate.Limiter) context.Context { + return context.WithValue(ctx, rateLimiterKey{}, limiter) +} + +func rateLimiterFromContext(ctx context.Context) *rate.Limiter { + if lim, ok := ctx.Value(rateLimiterKey{}).(*rate.Limiter); ok { + return lim + } + return nil +} + var ( ErrUnrecognizedObject = errors.New("did not recognize object type") ) @@ -174,12 +190,29 @@ func (v *StreamProcessor) handleRawModel(rawModel map[string]interface{}) (trans // readBatch will read up to `batchSize` objects from the ndjson stream // it returns a slice of eventables and a bool that indicates if there might be more to read. -func (s *StreamProcessor) readBatch(batchSize int, reader StreamReader, response *Result) ([]transform.Transformable, bool) { - var err error - var rawModel map[string]interface{} +func (s *StreamProcessor) readBatch(ctx context.Context, rl *rate.Limiter, batchSize int, reader StreamReader, response *Result) ([]transform.Transformable, bool) { + var ( + err error + rawModel map[string]interface{} + eventables []transform.Transformable + ) + + if rl != nil { + // use provided rate limiter to throttle batch read + ctxT, cancel := context.WithTimeout(ctx, time.Second) + err = rl.WaitN(ctxT, batchSize) + cancel() + if err != nil { + response.Add(&Error{ + Type: RateLimitErrType, + Message: "rate limit exceeded", + }) + return eventables, true + } + } - var eventables []transform.Transformable for i := 0; i < batchSize && err == nil; i++ { + rawModel, err = reader.Read() if err != nil && err != io.EOF { @@ -227,9 +260,11 @@ func (s *StreamProcessor) HandleStream(ctx context.Context, meta map[string]inte Config: s.Tconfig, Metadata: *metadata, } + rl := rateLimiterFromContext(ctx) for { - transformables, done := s.readBatch(batchSize, jsonReader, res) + + transformables, done := s.readBatch(ctx, rl, batchSize, jsonReader, res) if transformables != nil { err := report(ctx, publish.PendingReq{ Transformables: transformables, diff --git a/processor/stream/stream_processor_test.go b/processor/stream/stream_processor_test.go index bf4802acfbb..eb4f6f0887d 100644 --- a/processor/stream/stream_processor_test.go +++ b/processor/stream/stream_processor_test.go @@ -27,6 +27,8 @@ import ( "testing/iotest" "time" + "golang.org/x/time/rate" + "github.com/elastic/apm-server/decoder" "github.com/elastic/apm-server/tests" "github.com/elastic/apm-server/tests/loader" @@ -122,9 +124,9 @@ func TestIntegration(t *testing.T) { {path: "transactions.ndjson", name: "Transactions"}, {path: "spans.ndjson", name: "Spans"}, {path: "metrics.ndjson", name: "Metrics"}, - {path: "minimal_process.ndjson", name: "MixedMinimalProcess"}, - {path: "minimal_service.ndjson", name: "MinimalService"}, - {path: "metadata_null_values.ndjson", name: "MetadataNullValues"}, + {path: "events.ndjson", name: "Events"}, + {path: "minimal-service.ndjson", name: "MinimalService"}, + {path: "metadata-null-values.ndjson", name: "MetadataNullValues"}, {path: "invalid-event.ndjson", name: "InvalidEvent"}, {path: "invalid-json-event.ndjson", name: "InvalidJSONEvent"}, {path: "invalid-json-metadata.ndjson", name: "InvalidJSONMetadata"}, @@ -156,3 +158,34 @@ func TestIntegration(t *testing.T) { }) } } + +func TestRateLimiting(t *testing.T) { + report := func(ctx context.Context, p publish.PendingReq) error { + for range p.Transformables { + } + return nil + } + + b, err := loader.LoadDataAsBytes("../testdata/intake-v2/ratelimit.ndjson") + require.NoError(t, err) + for _, test := range []struct { + name string + lim *rate.Limiter + hit int + }{ + {name: "NoLimiter"}, + {name: "LimiterDenyAll", lim: rate.NewLimiter(rate.Limit(0), 2)}, + {name: "LimiterAllowAll", lim: rate.NewLimiter(rate.Limit(40), 40*5)}, + {name: "LimiterPartiallyUsedLimitAllow", lim: rate.NewLimiter(rate.Limit(10), 10*2), hit: 10}, + {name: "LimiterPartiallyUsedLimitDeny", lim: rate.NewLimiter(rate.Limit(7), 7*2), hit: 10}, + {name: "LimiterDeny", lim: rate.NewLimiter(rate.Limit(6), 6*2)}, + } { + reader := decoder.NewNDJSONStreamReader(bytes.NewReader(b), 100*1024) + if test.hit > 0 { + assert.True(t, test.lim.AllowN(time.Now(), test.hit)) + } + ctx := ContextWithRateLimiter(context.Background(), test.lim) + actualResult := (&StreamProcessor{}).HandleStream(ctx, map[string]interface{}{}, reader, report) + assertApproveResult(t, actualResult, test.name) + } +} diff --git a/testdata/intake-v2/minimal_process.ndjson b/testdata/intake-v2/events.ndjson similarity index 100% rename from testdata/intake-v2/minimal_process.ndjson rename to testdata/intake-v2/events.ndjson diff --git a/testdata/intake-v2/metadata_null_values.ndjson b/testdata/intake-v2/metadata-null-values.ndjson similarity index 100% rename from testdata/intake-v2/metadata_null_values.ndjson rename to testdata/intake-v2/metadata-null-values.ndjson diff --git a/testdata/intake-v2/minimal_service.ndjson b/testdata/intake-v2/minimal-service.ndjson similarity index 100% rename from testdata/intake-v2/minimal_service.ndjson rename to testdata/intake-v2/minimal-service.ndjson diff --git a/testdata/intake-v2/only-metadata.ndjson b/testdata/intake-v2/only-metadata.ndjson new file mode 100644 index 00000000000..5714fb45eed --- /dev/null +++ b/testdata/intake-v2/only-metadata.ndjson @@ -0,0 +1 @@ +{"metadata": {"process": {"ppid": 6789, "pid": 1234, "argv": ["node", "server.js"], "title": "node"}, "system": {"platform": "darwin", "hostname": "prod1.example.com", "architecture": "x64"}, "service": {"name": "1234_service-12a3", "language": {"version": "8", "name": "ecmascript"}, "agent": {"version": "3.14.0", "name": "elastic-node"}, "environment": "staging", "framework": {"version": "1.2.3", "name": "Express"}, "version": "5.1.3", "runtime": {"version": "8.0.0", "name": "node"}}}} diff --git a/testdata/intake-v2/ratelimit.ndjson b/testdata/intake-v2/ratelimit.ndjson new file mode 100644 index 00000000000..8d48399ed13 --- /dev/null +++ b/testdata/intake-v2/ratelimit.ndjson @@ -0,0 +1,20 @@ +{"metadata": { "process": {"pid": 1234 }, "service": {"name": "1234_service-12a3", "language": {"name": "ecmascript"}, "agent": {"version": "3.14.0", "name": "elastic-node"}}}} +{ "error": {"id": "abcdef0123456789", "timestamp": "2018-08-09T15:04:05.999Z","log": {"level": "custom log level","message": "Cannot read property 'baz' of undefined"}}} +{ "span": { "id": "0123456a89012345", "trace_id": "0123456789abcdef0123456789abcdef", "parent_id": "ab23456a89012345", "transaction_id": "ab23456a89012345", "parent": 1, "name": "GET /api/types", "type": "request", "start": 1.845, "duration": 3.5642981, "stacktrace": [], "context": {} }} +{ "transaction": { "trace_id": "01234567890123456789abcdefabcdef", "id": "abcdef1478523690", "type": "request", "duration": 32.592981, "timestamp": "2018-08-30T18:53:27.154Z", "result": "200", "context": null, "spans": null, "sampled": null, "span_count": { "started": 0 }}} +{ "metricset": { "samples": { "go.memstats.heap.sys.bytes": { "value": 61235 } }, "timestamp": "2017-05-30T18:53:42.281Z" }} +{ "error": {"id": "abcdef0123456789", "timestamp": "2018-08-09T15:04:05.999Z","log": {"level": "custom log level","message": "Cannot read property 'baz' of undefined"}}} +{ "span": { "id": "0123456a89012345", "trace_id": "0123456789abcdef0123456789abcdef", "parent_id": "ab23456a89012345", "transaction_id": "ab23456a89012345", "parent": 1, "name": "GET /api/types", "type": "request", "start": 1.845, "duration": 3.5642981, "stacktrace": [], "context": {} }} +{ "transaction": { "trace_id": "01234567890123456789abcdefabcdef", "id": "abcdef1478523690", "type": "request", "duration": 32.592981, "timestamp": "2018-08-30T18:53:27.154Z", "result": "200", "context": null, "spans": null, "sampled": null, "span_count": { "started": 0 }}} +{ "metricset": { "samples": { "go.memstats.heap.sys.bytes": { "value": 61235 } }, "timestamp": "2017-05-30T18:53:42.281Z" }} +{ "error": {"id": "abcdef0123456789", "timestamp": "2018-08-09T15:04:05.999Z","log": {"level": "custom log level","message": "Cannot read property 'baz' of undefined"}}} +{ "span": { "id": "0123456a89012345", "trace_id": "0123456789abcdef0123456789abcdef", "parent_id": "ab23456a89012345", "transaction_id": "ab23456a89012345", "parent": 1, "name": "GET /api/types", "type": "request", "start": 1.845, "duration": 3.5642981, "stacktrace": [], "context": {} }} +{ "transaction": { "trace_id": "01234567890123456789abcdefabcdef", "id": "abcdef1478523690", "type": "request", "duration": 32.592981, "timestamp": "2018-08-30T18:53:27.154Z", "result": "200", "context": null, "spans": null, "sampled": null, "span_count": { "started": 0 }}} +{ "metricset": { "samples": { "go.memstats.heap.sys.bytes": { "value": 61235 } }, "timestamp": "2017-05-30T18:53:42.281Z" }} +{ "error": {"id": "abcdef0123456789", "timestamp": "2018-08-09T15:04:05.999Z","log": {"level": "custom log level","message": "Cannot read property 'baz' of undefined"}}} +{ "span": { "id": "0123456a89012345", "trace_id": "0123456789abcdef0123456789abcdef", "parent_id": "ab23456a89012345", "transaction_id": "ab23456a89012345", "parent": 1, "name": "GET /api/types", "type": "request", "start": 1.845, "duration": 3.5642981, "stacktrace": [], "context": {} }} +{ "transaction": { "trace_id": "01234567890123456789abcdefabcdef", "id": "abcdef1478523690", "type": "request", "duration": 32.592981, "timestamp": "2018-08-30T18:53:27.154Z", "result": "200", "context": null, "spans": null, "sampled": null, "span_count": { "started": 0 }}} +{ "metricset": { "samples": { "go.memstats.heap.sys.bytes": { "value": 61235 } }, "timestamp": "2017-05-30T18:53:42.281Z" }} +{ "error": {"id": "abcdef0123456789", "timestamp": "2018-08-09T15:04:05.999Z","log": {"level": "custom log level","message": "Cannot read property 'baz' of undefined"}}} +{ "span": { "id": "0123456a89012345", "trace_id": "0123456789abcdef0123456789abcdef", "parent_id": "ab23456a89012345", "transaction_id": "ab23456a89012345", "parent": 1, "name": "GET /api/types", "type": "request", "start": 1.845, "duration": 3.5642981, "stacktrace": [], "context": {} }} +{ "transaction": { "trace_id": "01234567890123456789abcdefabcdef", "id": "abcdef1478523690", "type": "request", "duration": 32.592981, "timestamp": "2018-08-30T18:53:27.154Z", "result": "200", "context": null, "spans": null, "sampled": null, "span_count": { "started": 0 }}} diff --git a/tests/system/apmserver.py b/tests/system/apmserver.py index 511450a6e92..d1609aca130 100644 --- a/tests/system/apmserver.py +++ b/tests/system/apmserver.py @@ -51,11 +51,11 @@ def get_transaction_payload_path(self, name="payload.json"): def get_metrics_payload_path(self, name="payload.json"): return self.get_payload_path("metric", name) - def get_transaction_v2_payload(self): - with open(self.get_transaction_v2_payload_path()) as f: + def get_event_v2_payload(self, name="events.ndjson"): + with open(self.get_event_v2_payload_path(name=name)) as f: return f.read() - def get_transaction_v2_payload_path(self, name="transactions.ndjson"): + def get_event_v2_payload_path(self, name="events.ndjson"): return self._beat_path_join( 'testdata', 'intake-v2', @@ -276,6 +276,8 @@ class ClientSideBaseTest(ServerBaseTest): errors_url = 'http://localhost:8200/v1/rum/errors' sourcemap_url = 'http://localhost:8200/assets/v1/sourcemaps' + intake_v2_url = 'http://localhost:8200/intake/v2/rum/events' + @classmethod def setUpClass(cls): super(ClientSideBaseTest, cls).setUpClass() diff --git a/tests/system/config/apm-server.yml.j2 b/tests/system/config/apm-server.yml.j2 index 0a0a44a53ed..4725064589a 100644 --- a/tests/system/config/apm-server.yml.j2 +++ b/tests/system/config/apm-server.yml.j2 @@ -13,6 +13,7 @@ apm-server: rum.allow_origins: {{ allow_origins }} rum.library_pattern: "~/test|library" rum.exclude_from_grouping: "~/test" + rum.event_rate.limit: 16 {% if smap_cache_expiration%} rum.source_mapping.cache.expiration: {{ smap_cache_expiration}} diff --git a/tests/system/test_access.py b/tests/system/test_access.py index 273f4f221ab..792463abc41 100644 --- a/tests/system/test_access.py +++ b/tests/system/test_access.py @@ -39,7 +39,7 @@ def test_with_token_v2(self): """ url = 'http://localhost:8200/intake/v2/events' - transactions = self.get_transaction_v2_payload() + transactions = self.get_event_v2_payload(name="transactions.ndjson") headers = {'content-type': 'application/x-ndjson'} def oauth(v): diff --git a/tests/system/test_requests.py b/tests/system/test_requests.py index a65cb2b298d..95bd33938ef 100644 --- a/tests/system/test_requests.py +++ b/tests/system/test_requests.py @@ -218,7 +218,6 @@ def test_preflight_bad_headers(self): class RateLimitTest(ClientSideBaseTest): def test_rate_limit(self): - transactions = self.get_transaction_payload() threads = [] codes = defaultdict(int) @@ -244,7 +243,6 @@ def fire(): assert fire() == 202 def test_rate_limit_multiple_ips(self): - transactions = self.get_transaction_payload() threads = [] codes = defaultdict(int) @@ -272,3 +270,77 @@ def fire(x): time.sleep(1) assert fire(0) == 202 assert fire(1) == 202 + + +class RateLimitV2Test(ClientSideBaseTest): + + def fire_events(self, data_file, iterations, split_ips=False): + transactions = self.get_event_v2_payload(name=data_file) + headers = {'content-type': 'application/x-ndjson'} + threads = [] + codes = defaultdict(int) + + def fire(x): + ip = '10.11.12.13' + if split_ips and x % 2: + ip = '10.11.12.14' + r = requests.post(self.intake_v2_url, + data=transactions, + headers={'content-type': 'application/x-ndjson', + 'X-Forwarded-For': ip}) + codes[r.status_code] += 1 + return r.status_code + + # rate limit hit, because every event in request is counted + for x in range(iterations): + threads.append(threading.Thread(target=fire, args=(x,))) + + for t in threads: + t.start() + time.sleep(0.01) + + for t in threads: + t.join() + return codes + + # limit: 16, burst_multiplier: 3, burst: 48 + def test_rate_limit(self): + # all requests from the same ip + # 19 events, batch size 10 => 20+1 events per requ + codes = self.fire_events("ratelimit.ndjson", 3) + assert set(codes.keys()) == set([202]), codes + + def test_rate_limit_hit(self): + # all requests from the same ip + codes = self.fire_events("ratelimit.ndjson", 5) + assert set(codes.keys()) == set([202, 429]), codes + assert codes[429] == 2, codes + assert codes[202] == 3, codes + + def test_rate_limit_small_hit(self): + # all requests from the same ip + # 4 events, batch size 10 => 10+1 events per requ + codes = self.fire_events("events.ndjson", 8) + assert set(codes.keys()) == set([202, 429]), codes + assert codes[429] == 3, codes + assert codes[202] == 5, codes + + def test_rate_limit_only_metadata(self): + # all requests from the same ip + # no events, batch size 10 => 10+1 events per requ + codes = self.fire_events("only-metadata.ndjson", 8) + assert set(codes.keys()) == set([202, 429]), codes + assert codes[429] == 3, codes + assert codes[202] == 5, codes + + def test_multiple_ips_rate_limit(self): + # requests from 2 different ips + codes = self.fire_events("ratelimit.ndjson", 6, True) + assert set(codes.keys()) == set([202]), codes + + def test_multiple_ips_rate_limit_hit(self): + # requests from 2 different ips + codes = self.fire_events("ratelimit.ndjson", 10, True) + assert set(codes.keys()) == set([202, 429]), codes + assert codes[429] == 4, codes + assert codes[202] == 6, codes