diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 4d21f0fd7cf..123da7ed75e 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -3,3 +3,283 @@ include::./changelogs/6.4.asciidoc[] include::./changelogs/6.3.asciidoc[] include::./changelogs/6.2.asciidoc[] include::./changelogs/6.1.asciidoc[] +[[release-notes-head]] +== APM Server version HEAD + +// These have to be under a == headline unfortunately: +// Use these for links to issue and pulls. Note issues and pulls redirect one to +// each other on Github, so don't worry too much on using the right prefix. +:issue: https://github.com/elastic/apm-server/issues/ +:pull: https://github.com/elastic/apm-server/pull/ + +https://github.com/elastic/apm-server/compare/6.4\...master[View commits] + +[float] +=== Added + +- Provide basic server information at `/` {pull}1197[1197] +- Add pipeline registration and pipeline usage {pull}1258[1258],{pull}1296[1296] +- Allow sending `tags` for `spans`, that get indexed in ES {pull}1156[1156]. +- Add pipeline registration and pipeline usage {pull}1258[1258] +- Update JSON schema spec for v2 {pull}1303[1303] +- Make JSON schema spec distributed tracing compliant {pull}1303[1303] +- Add required `transaction.span_count.started` to Intake v2 {pull}1351[1351] +- Rename `transaction.span_count.dropped.total` to `transaction.span_count.dropped` on Intake API v2 {pull}1351[1351] +- Require either `message` or `type` for `error.exception` {pull}1354[1354]. +- Require `span.parent_id`, forbid `null` for `span.trace_id`, `transaction.trace_id` {pull}1391[1391] +- Require `error.id` and changed description to 128 random bits ID {pull}1384[1384] +- Add rate limit handling per event {pull}1367[1367] + +[[release-notes-6.4]] +== APM Server version 6.4 + +https://github.com/elastic/apm-server/compare/6.3\...6.4[View commits] + +* <> + +[[release-notes-6.4.0]] +=== APM Server version 6.4.0 + +https://github.com/elastic/apm-server/compare/v6.3.2\...v6.4.0[View commits] + +- Change `frontend` to `rum` in config file, but still support `frontend` for backwards compatibility {pull}1155[1155]. +- Support `rum` and `client-side` endpoint for RUM for backwards compatibility {pull}1155[1155]. +- Listen on default port 8200 if unspecified {pull}886[886]. +- Update Go to 1.10.3 {pull}1054[1054]. +- Combine `apm-server.yml` and `apm-server.reference.yml` into one file {pull}958[958]. +- Add optional tracing for the server's API and event publisher {pull}816[816]. +- Read sourcemap content and fill context lines {pull}972[972]. +- Remove regexProperties validation rules {pull}1148[1148], {pull}1150[1150]. +- Add source_mapping.elasticsearch configuration option {pull}1114[1114]. +- Add /v1/metrics endpoint {pull}1000[1000] {pull}1121[1121]. +- Push onboarding doc to separate ES index {pull}1159[1159]. +- Deprecate usage of `apm-server setup` for dashboards and index-pattern {pull}1142[1142],{pull}1261[1261]. +- Disable metrics logging by default {pull}1127[1127]. + + +[[release-notes-6.3]] +== APM Server version 6.3 + +https://github.com/elastic/apm-server/compare/6.2\...6.3[View commits] + +* <> +* <> +* <> + + +[[release-notes-6.3.2]] +=== APM Server version 6.3.2 + +https://github.com/elastic/apm-server/compare/v6.3.1\...v6.3.2[View commits] + +No significant changes. + + +[[release-notes-6.3.1]] +=== APM Server version 6.3.1 + +https://github.com/elastic/apm-server/compare/v6.3.0\...v6.3.1[View commits] + +No significant changes. + + +[[release-notes-6.3.0]] +=== APM Server version 6.3.0 + +https://github.com/elastic/apm-server/compare/v6.2.4\...v6.3.0[View commits] + +[float] +==== Bug fixes + +- Accept charset in request's content type {pull}677[677]. +- Use type integer instead of number in JSON schemas where applicable {pull}641[641]. +- Set array item types to string in JSON schemas {pull}651[651]. +- Fix issue preventing server from being stopped {pull}704[704]. +- Limit the amount of concurrent requests being processed {pull}731[731]. +- Return proper response code for request entity too large {pull}862[862]. +- Make APM Server docker image listen on all interfaces by default https://github.com/elastic/apm-server-docker/pull/16[apm-server-dockers#16] + +[float] +==== Added + +- Enriched data with IP and UserAgent {pull}393[393], {pull}701[701], {pull}730[730], {pull}923[923]. +- Push errors and transactions to different ES indices {pull}706[706]. +- Allow custom `error.log.level` {pull}712[712]. +- Change `concurrent_request` default from 40 to 5 {pull}731[731]. +- Change `max_unzipped_size` default from 50mb to 30mb {pull}731[731]. +- Change `read_timeout` and `write_timeout` defaults from 2s to 30s {pull}748[748], {pull}752[752]. +- Limit number of new connections to accept simultaneously {pull}751[751]. +- Push spans to separate ES index {pull}774[774]. +- Update Go to 1.9.4 {pull}786[786]. +- Listen on unix domain socket with `host=unix:/path` {pull}768[768]. +- Make timestamp optional in the intake api {pull}819[819]. + + +[[release-notes-6.2]] +== APM Server version 6.2 + +https://github.com/elastic/apm-server/compare/6.1...6.2[View commits] + +* <> +* <> +* <> +* <> +* <> + + +[[release-notes-6.2.4]] +=== APM Server version 6.2.4 + +https://github.com/elastic/apm-server/compare/v6.2.3\...v6.2.4[View commits] + +No significant changes. + +[[release-notes-6.2.3]] +=== APM Server version 6.2.3 + +https://github.com/elastic/apm-server/compare/v6.2.2\...v6.2.3[View commits] + +No significant changes. + +[[release-notes-6.2.2]] +=== APM Server version 6.2.2 + +https://github.com/elastic/apm-server/compare/v6.2.1\...v6.2.2[View commits] + +No significant changes. + +[[release-notes-6.2.1]] +=== APM Server version 6.2.1 + +https://github.com/elastic/apm-server/compare/v6.2.0\...v6.2.1[View commits] + +No significant changes. + +[[release-notes-6.2.0]] +=== APM Server version 6.2.0 + +https://github.com/elastic/apm-server/compare/v6.1.4\...v6.2.0[View commits] + +[float] +==== Breaking changes +- Renaming and reverse boolean `in_app` to `library_frame` {pull}385[385]. +- Renaming `app` to `service` {pull}377[377] +- Move `trace.transaction_id` to `transaction.id` {pull}345[345], {pull}347[347], {pull}371[371] +- Renaming `trace` to `span` {pull}352[352]. +- Renaming and reverse boolean `exception.uncaught` to `exception.handled` {pull}434[434]. +- Move process related fields to their own namespace {pull}445[445]. +- Rename Kibana directories according to changed structure in beats framework. {pull}454[454] +- Change config option `max_header_bytes` to `max_header_size` {pull}492[492]. +- Change config option `frontend.sourcemapping.index` to `frontend.source_mapping.index_pattern` and remove adding a '*' by default.{pull}492[492]. +- Remove untested config options from config yml files {pull}496[496] + +[float] +==== Bug fixes +- Updated systemd doc url {pull}354[354] +- Updated readme doc urls {pull}356[356] +- Use updated stack trace frame values for calculating error `grouping_keys` {pull}485[485] +- Fix panic when a signal is delivered before the server is instantiated {pull}580[580] + +[float] +==== Added +- service.environment {pull}366[366] +- Consider exception or log message for grouping key when nothing else is available {pull}435[435] +- Add context.request.url.full {pull}436[436] +- Report more detail on max data size error {pull}442[442] +- Increase default 'MaxUnzippedSize' from 10mb to 50mb {pull}439[439] +- Add transaction.id to errors {pull}437[437] +- Support for `transaction.marks` {pull}430[430] +- Support for uploading sourcemaps {pull}302[302]. +- Support for sourcemap mapping on incoming frontend requests {pull}381[381], {pull}462[462], {pull}502[502] +- Support for `transaction.span_count.dropped.total` {pull}448[448]. +- Optional field `transaction.sampled` {pull}441[441] +- Add Kibana sourcefilter for `sourcemap.sourcemap` {pull}454[454] +- Increase default 'ConcurrentRequests' from 20 to 40 {pull}492[492] +- Add Config option for excluding stack trace frames from `grouping_key` calculation {pull}482[482] +- Expose expvar {pull}509[509] +- Add `process.ppid` as optional field {pull}564[564] +- Change `error.culprit` after successfully applying sourcemapping {pull}520[520] +- Make `transaction.name` optional {pull}554[554] +- Remove config files from beats. Manually add relevant config options {pull}578[578] +- Use separate index for uploaded `source maps` {pull}582[582]. +- Store original values when applying source mapping or changing `library_frame` value {pull}647[647] + + +[[release-notes-6.1]] +== APM Server version 6.1 + +https://github.com/elastic/apm-server/compare/6.0\...6.1[View commits] + +* <> +* <> +* <> +* <> +* <> + + +[[release-notes-6.1.4]] +=== APM Server version 6.1.4 + +https://github.com/elastic/apm-server/compare/v6.1.3\...v6.1.4[View commits] + +No significant changes. + + +[[release-notes-6.1.3]] +=== APM Server version 6.1.3 + +https://github.com/elastic/apm-server/compare/v6.1.2\...v6.1.3[View commits] + +No significant changes. + + +[[release-notes-6.1.2]] +=== APM Server version 6.1.2 + +https://github.com/elastic/apm-server/compare/v6.1.1\...v6.1.2[View commits] + +No significant changes. + + +[[release-notes-6.1.1]] +=== APM Server version 6.1.1 + +https://github.com/elastic/apm-server/compare/v6.1.0\...v6.1.1[View commits] + +No significant changes. + + +[[release-notes-6.1.0]] +=== APM Server version 6.1.0 + +https://github.com/elastic/apm-server/compare/v6.0.1\...v6.1.0[View commits] + +[float] +==== Breaking changes +- Allow ES template index prefix to be `apm` {pull}152[152]. +- Remove `git_ref` from Intake API and Elasticsearch output {pull}158[158]. +- Switch to Go 1.9.2 + +[float] +==== Bug fixes +- Fix dashboard loading for Kibana 5x {pull}221[221]. +- Fix command for loading dashboards in docs {pull}205[205]. +- Log a warning message if secret token is set but ssl is not {pull}204[204]. +- Fix wrong content-type in response {pull}171[171]. +- Remove duplicate dashboard entries {pull}162[162]. +- Remove `context.db` from `fields.yml` for consistency, has not been indexed before {pull}159[159]. +- Update dashboard with fix for rpm graphs {pull}315[315]. +- Dashboards: Remove time from url_templates {pull}321[321]. + +[float] +==== Added +- Added wildcard matching for allowed origins for frontend {pull}287[287]. +- Added rate limit per IP for frontend {pull}257[257]. +- Allow null for all optional fields {pull}253[253]. +- Make context.app.language.version optional {pull}246[246]. +- CORS support for frontend {pull}244[244]. +- Added support for frontend {pull}227[227]. +- Show transaction.result in Requests per Minute {pull}226[226]. +- Added Kibana 5.6 compatible dashboards {pull}208[208]. +- Send document to output on start of server {pull}117[117]. +- Log frontend status at startup {pull}284[284]. diff --git a/_meta/beat.yml b/_meta/beat.yml index 3036af0268a..962e1b266c8 100644 --- a/_meta/beat.yml +++ b/_meta/beat.yml @@ -49,9 +49,26 @@ apm-server: # To enable real user monitoring (RUM) support set this to true. #enabled: false + #-- v1 RUM endpoint + # Rate limit per second and IP address for requests sent to the RUM endpoint. #rate_limit: 10 + #-- v2 RUM endpoint + + #event_rate: + + # Defines the maximum amount of events allowed to be sent to the APM Server v2 RUM + # endpoint per ip per second. Defaults to 300. + #limit: 300 + + # An LRU cache is used to keep a rate limit per IP for the most recently seen IPs. + # This setting defines the number of unique IPs that can be tracked in the cache. + # Sites with many concurrent clients should consider increasing this limit. Defaults to 1000. + #lru_size: 1000 + + #-- General RUM settings + # Comma separated list of permitted origins for real user monitoring. # User-agents will send an origin header that will be validated against this list. # An origin is made of a protocol scheme, host and port, without the url path. @@ -97,6 +114,7 @@ apm-server: # is changed, a matching index pattern needs to be specified here. #index_pattern: "apm-*-sourcemap*" + # If set to true, APM Server augments data received by the agent with the original IP of the backend server, # or the IP and User Agent of the real user (RUM requests). It defaults to true. #capture_personal_data: true diff --git a/apm-server.yml b/apm-server.yml index e508061cd8d..a1a9f11f987 100644 --- a/apm-server.yml +++ b/apm-server.yml @@ -49,9 +49,26 @@ apm-server: # To enable real user monitoring (RUM) support set this to true. #enabled: false + #-- v1 RUM endpoint + # Rate limit per second and IP address for requests sent to the RUM endpoint. #rate_limit: 10 + #-- v2 RUM endpoint + + #event_rate: + + # Defines the maximum amount of events allowed to be sent to the APM Server v2 RUM + # endpoint per ip per second. Defaults to 300. + #limit: 300 + + # An LRU cache is used to keep a rate limit per IP for the most recently seen IPs. + # This setting defines the number of unique IPs that can be tracked in the cache. + # Sites with many concurrent clients should consider increasing this limit. Defaults to 1000. + #lru_size: 1000 + + #-- General RUM settings + # Comma separated list of permitted origins for real user monitoring. # User-agents will send an origin header that will be validated against this list. # An origin is made of a protocol scheme, host and port, without the url path. @@ -97,6 +114,7 @@ apm-server: # is changed, a matching index pattern needs to be specified here. #index_pattern: "apm-*-sourcemap*" + # If set to true, APM Server augments data received by the agent with the original IP of the backend server, # or the IP and User Agent of the real user (RUM requests). It defaults to true. #capture_personal_data: true diff --git a/beater/approved-stream-result/TestRequestIntegrationRumRateLimit.approved.json b/beater/approved-stream-result/TestRequestIntegrationRumRateLimit.approved.json new file mode 100644 index 00000000000..748c625209c --- /dev/null +++ b/beater/approved-stream-result/TestRequestIntegrationRumRateLimit.approved.json @@ -0,0 +1,8 @@ +{ + "accepted": 30, + "errors": [ + { + "message": "rate limit exceeded" + } + ] +} diff --git a/beater/beater.go b/beater/beater.go index 370420bb8a3..01387fe5a02 100644 --- a/beater/beater.go +++ b/beater/beater.go @@ -18,16 +18,13 @@ package beater import ( - "fmt" "net" "net/http" "net/url" "os" - "regexp" "sync" "time" - "github.com/pkg/errors" "golang.org/x/sync/errgroup" "github.com/elastic/apm-agent-go" @@ -52,18 +49,11 @@ type beater struct { // Creates beater func New(b *beat.Beat, ucfg *common.Config) (beat.Beater, error) { logger := logp.NewLogger("beater") - beaterConfig := defaultConfig(b.Info.Version) - if err := ucfg.Unpack(beaterConfig); err != nil { - return nil, errors.Wrap(err, "Error processing configuration") + beaterConfig, err := NewConfig(b.Info.Version, ucfg) + if err != nil { + return nil, err } - beaterConfig.SetRumConfig() if beaterConfig.RumConfig.isEnabled() { - if _, err := regexp.Compile(beaterConfig.RumConfig.LibraryPattern); err != nil { - return nil, errors.New(fmt.Sprintf("Invalid regex for `library_pattern`: %v", err.Error())) - } - if _, err := regexp.Compile(beaterConfig.RumConfig.ExcludeFromGrouping); err != nil { - return nil, errors.New(fmt.Sprintf("Invalid regex for `exclude_from_grouping`: %v", err.Error())) - } if b.Config != nil && beaterConfig.RumConfig.SourceMapping.EsConfig == nil { // fall back to elasticsearch output configuration for sourcemap storage if possible if isElasticsearchOutput(b) { diff --git a/beater/beater_test.go b/beater/beater_test.go index fede24acc49..a6cd11d4af7 100644 --- a/beater/beater_test.go +++ b/beater/beater_test.go @@ -85,8 +85,12 @@ func TestBeatConfig(t *testing.T) { "url": "/debug/vars", }, "frontend": map[string]interface{}{ - "enabled": true, - "rate_limit": 1000, + "enabled": true, + "rate_limit": 1000, + "event_rate": map[string]interface{}{ + "limit": 7200, + "lru_size": 2000, + }, "allow_origins": []string{"example*"}, "source_mapping": map[string]interface{}{ "cache": map[string]interface{}{ @@ -127,8 +131,12 @@ func TestBeatConfig(t *testing.T) { Url: "/debug/vars", }, FrontendConfig: &rumConfig{ - Enabled: &truthy, - RateLimit: 1000, + Enabled: &truthy, + RateLimit: 1000, + EventRate: &eventRate{ + Limit: 7200, + LruSize: 2000, + }, AllowOrigins: []string{"example*"}, SourceMapping: &SourceMapping{ Cache: &Cache{Expiration: 8 * time.Minute}, @@ -139,8 +147,12 @@ func TestBeatConfig(t *testing.T) { beatVersion: "6.2.0", }, RumConfig: &rumConfig{ - Enabled: &truthy, - RateLimit: 1000, + Enabled: &truthy, + RateLimit: 1000, + EventRate: &eventRate{ + Limit: 7200, + LruSize: 2000, + }, AllowOrigins: []string{"example*"}, SourceMapping: &SourceMapping{ Cache: &Cache{Expiration: 8 * time.Minute}, @@ -181,6 +193,9 @@ func TestBeatConfig(t *testing.T) { "frontend": map[string]interface{}{ "enabled": true, "rate_limit": 890, + "event_rate": map[string]interface{}{ + "lru_size": 200, + }, "source_mapping": map[string]interface{}{ "cache": map[string]interface{}{ "expiration": 4, @@ -223,6 +238,10 @@ func TestBeatConfig(t *testing.T) { FrontendConfig: &rumConfig{ Enabled: &truthy, RateLimit: 890, + EventRate: &eventRate{ + Limit: 300, + LruSize: 200, + }, SourceMapping: &SourceMapping{ Cache: &Cache{ Expiration: 4 * time.Second, @@ -235,8 +254,12 @@ func TestBeatConfig(t *testing.T) { beatVersion: "6.2.0", }, RumConfig: &rumConfig{ - Enabled: &truthy, - RateLimit: 10, + Enabled: &truthy, + RateLimit: 10, + EventRate: &eventRate{ + Limit: 300, + LruSize: 1000, + }, AllowOrigins: []string{"*"}, SourceMapping: &SourceMapping{ Cache: &Cache{ diff --git a/beater/common_handlers.go b/beater/common_handlers.go index 65946eeb4ac..cd9f1d61a15 100644 --- a/beater/common_handlers.go +++ b/beater/common_handlers.go @@ -27,7 +27,7 @@ import ( "strings" "time" - "github.com/hashicorp/golang-lru" + lru "github.com/hashicorp/golang-lru" "github.com/pkg/errors" "github.com/ryanuber/go-glob" "github.com/satori/go.uuid" @@ -46,9 +46,6 @@ import ( ) const ( - rateLimitCacheSize = 1000 - rateLimitBurstMultiplier = 2 - supportedHeaders = "Content-Type, Content-Encoding, Accept" supportedMethods = "POST, OPTIONS" ) @@ -119,10 +116,11 @@ var ( counter: validateCounter, } } + rateLimitCounter = counter("response.errors.ratelimit") rateLimitedResponse = serverResponse{ err: errors.New("too many requests"), code: http.StatusTooManyRequests, - counter: counter("response.errors.ratelimit"), + counter: rateLimitCounter, } methodNotAllowedCounter = counter("response.errors.method") methodNotAllowedResponse = serverResponse{ @@ -165,7 +163,7 @@ func newMuxer(beaterConfig *Config, report publish.Reporter) *http.ServeMux { for path, route := range V2Routes { logger.Infof("Path %s added to request handler", path) - mux.Handle(path, route.Handler(beaterConfig, report)) + mux.Handle(path, route.Handler(path, beaterConfig, report)) } mux.Handle(rootURL, rootHandler(beaterConfig.SecretToken)) @@ -237,9 +235,11 @@ func rootHandler(secretToken string) http.Handler { return logHandler(handler) } -type contextKey string +type reqLoggerKey struct{} -const reqLoggerContextKey = contextKey("requestLogger") +func ContextWithReqLogger(ctx context.Context, rl *logp.Logger) context.Context { + return context.WithValue(ctx, reqLoggerKey{}, rl) +} func logHandler(h http.Handler) http.Handler { logger := logp.NewLogger("request") @@ -257,13 +257,8 @@ func logHandler(h http.Handler) http.Handler { "remote_address", utility.RemoteAddr(r), "user-agent", r.Header.Get("User-Agent")) - lr := r.WithContext( - context.WithValue(r.Context(), reqLoggerContextKey, reqLogger), - ) - lw := utility.NewRecordingResponseWriter(w) - - h.ServeHTTP(lw, lr) + h.ServeHTTP(lw, r.WithContext(ContextWithReqLogger(r.Context(), reqLogger))) if lw.Code <= 399 { reqLogger.Infow("handled request", []interface{}{"response_code", lw.Code}...) @@ -281,7 +276,7 @@ func requestTimeHandler(h http.Handler) http.Handler { // requestLogger is a convenience function to retrieve the logger that was // added to the request context by handler `logHandler`` func requestLogger(r *http.Request) *logp.Logger { - logger, ok := r.Context().Value(reqLoggerContextKey).(*logp.Logger) + logger, ok := r.Context().Value(reqLoggerKey{}).(*logp.Logger) if !ok { logger = logp.NewLogger("request") } @@ -298,6 +293,11 @@ func killSwitchHandler(killSwitch bool, h http.Handler) http.Handler { }) } +const ( + rateLimitCacheSize = 1000 + rateLimitBurstMultiplier = 2 +) + func ipRateLimitHandler(rateLimit int, h http.Handler) http.Handler { cache, _ := lru.New(rateLimitCacheSize) diff --git a/beater/config.go b/beater/config.go index 6bcb4770981..60e2c4f3e52 100644 --- a/beater/config.go +++ b/beater/config.go @@ -18,11 +18,14 @@ package beater import ( + "fmt" "net" "path/filepath" "regexp" "time" + "github.com/pkg/errors" + "github.com/elastic/apm-server/sourcemap" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/outputs" @@ -60,6 +63,7 @@ type ExpvarConfig struct { type rumConfig struct { Enabled *bool `config:"enabled"` RateLimit int `config:"rate_limit"` + EventRate *eventRate `config:"event_rate"` AllowOrigins []string `config:"allow_origins"` LibraryPattern string `config:"library_pattern"` ExcludeFromGrouping string `config:"exclude_from_grouping"` @@ -68,6 +72,11 @@ type rumConfig struct { beatVersion string } +type eventRate struct { + Limit int `config:"limit"` + LruSize int `config:"lru_size"` +} + type metricsConfig struct { Enabled *bool `config:"enabled"` } @@ -108,6 +117,24 @@ type InstrumentationConfig struct { Environment *string `config:"environment"` } +func NewConfig(version string, ucfg *common.Config) (*Config, error) { + c := defaultConfig(version) + if err := ucfg.Unpack(c); err != nil { + return nil, errors.Wrap(err, "Error processing configuration") + } + + c.setRumConfig() + if c.RumConfig.isEnabled() { + if _, err := regexp.Compile(c.RumConfig.LibraryPattern); err != nil { + return nil, errors.New(fmt.Sprintf("Invalid regex for `library_pattern`: %v", err.Error())) + } + if _, err := regexp.Compile(c.RumConfig.ExcludeFromGrouping); err != nil { + return nil, errors.New(fmt.Sprintf("Invalid regex for `exclude_from_grouping`: %v", err.Error())) + } + } + return c, nil +} + func (c *Config) setSmapElasticsearch(esConfig *common.Config) { if c != nil && c.RumConfig.isEnabled() && c.RumConfig.SourceMapping != nil { c.RumConfig.SourceMapping.EsConfig = esConfig @@ -142,7 +169,7 @@ func (c *pipelineConfig) shouldOverwrite() bool { return c != nil && (c.Overwrite != nil && *c.Overwrite) } -func (c *Config) SetRumConfig() { +func (c *Config) setRumConfig() { if c.RumConfig != nil && c.RumConfig.Enabled != nil { return } @@ -182,7 +209,11 @@ func replaceVersion(pattern, version string) string { func defaultRum(beatVersion string) *rumConfig { return &rumConfig{ - RateLimit: 10, + RateLimit: 10, + EventRate: &eventRate{ + Limit: 300, + LruSize: 1000, + }, AllowOrigins: []string{"*"}, SourceMapping: &SourceMapping{ Cache: &Cache{ diff --git a/beater/config_test.go b/beater/config_test.go index 3775eb7ad6a..816ef696197 100644 --- a/beater/config_test.go +++ b/beater/config_test.go @@ -54,6 +54,10 @@ func TestConfig(t *testing.T) { "rum": { "enabled": true, "rate_limit": 800, + "event_rate": { + "limit": 8000, + "lru_size": 2000, + }, "allow_origins": ["rum*"], "source_mapping": { "cache": { @@ -67,6 +71,10 @@ func TestConfig(t *testing.T) { "frontend": { "enabled": true, "rate_limit": 1000, + "event_rate": { + "limit": 1000, + "lru_size": 500, + }, "allow_origins": ["example*"], "source_mapping": { "cache": { @@ -97,8 +105,12 @@ func TestConfig(t *testing.T) { SecretToken: "1234random", SSL: &SSLConfig{Enabled: &truthy, Certificate: outputs.CertificateConfig{Certificate: "1234cert", Key: "1234key"}}, RumConfig: &rumConfig{ - Enabled: &truthy, - RateLimit: 800, + Enabled: &truthy, + RateLimit: 800, + EventRate: &eventRate{ + Limit: 8000, + LruSize: 2000, + }, AllowOrigins: []string{"rum*"}, SourceMapping: &SourceMapping{ Cache: &Cache{Expiration: 1 * time.Minute}, @@ -108,8 +120,12 @@ func TestConfig(t *testing.T) { ExcludeFromGrouping: "group_pattern-rum", }, FrontendConfig: &rumConfig{ - Enabled: &truthy, - RateLimit: 1000, + Enabled: &truthy, + RateLimit: 1000, + EventRate: &eventRate{ + Limit: 1000, + LruSize: 500, + }, AllowOrigins: []string{"example*"}, SourceMapping: &SourceMapping{ Cache: &Cache{Expiration: 10 * time.Minute}, @@ -172,6 +188,7 @@ func TestConfig(t *testing.T) { RumConfig: &rumConfig{ Enabled: nil, RateLimit: 0, + EventRate: nil, AllowOrigins: nil, SourceMapping: &SourceMapping{ IndexPattern: "", @@ -265,7 +282,7 @@ func TestIsRumEnabled(t *testing.T) { {c: &Config{RumConfig: &rumConfig{Enabled: &truthy}}, enabled: true}, {c: &Config{RumConfig: &rumConfig{Enabled: new(bool)}, FrontendConfig: &rumConfig{Enabled: &truthy}}, enabled: false}, } { - td.c.SetRumConfig() + td.c.setRumConfig() assert.Equal(t, td.enabled, td.c.RumConfig.isEnabled()) } @@ -301,7 +318,7 @@ func TestSetRum(t *testing.T) { {c: &Config{RumConfig: testRumConf, FrontendConfig: testFrontendConf}, rc: testRumConf}, } for _, test := range cases { - test.c.SetRumConfig() + test.c.setRumConfig() assert.Equal(t, test.rc, test.c.RumConfig) } } diff --git a/beater/rl_cache.go b/beater/rl_cache.go new file mode 100644 index 00000000000..4fd55a527e0 --- /dev/null +++ b/beater/rl_cache.go @@ -0,0 +1,88 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package beater + +import ( + "sync" + + "github.com/hashicorp/golang-lru/simplelru" + "github.com/pkg/errors" + "golang.org/x/time/rate" +) + +const burstMultiplier = 3 + +// The rlCache is a simple lru cache holding N=size rate limiter entities. Every +// rate limiter entity allows N=rateLimit hits per key (=IP) per second, and has a +// burst queue of limit*5. +// As the used lru cache is of a fixed size, cache entries can get evicted, in +// which case the evicted limiter is reused as the new rate limiter for the +// current key. This adds a certain random factor to the rate limiting in case the +// cache is full. The purpose is to avoid bypassing the rate limiting by sending +// requests from cache_size*2 unique keys, which would lead to evicted keys and +// the creation of new rate limiter entities with full allowance. +type rlCache struct { + cache *simplelru.LRU + limit int + + mu sync.Mutex //guards limiter in cache + evictedLimiter *rate.Limiter +} + +func NewRlCache(size, rateLimit int) (*rlCache, error) { + if size <= 0 || rateLimit < 0 { + return nil, errors.New("cache initialization: size and rateLimit must be greater than zero") + } + + rlc := rlCache{limit: rateLimit} + + var onEvicted = func(_ interface{}, value interface{}) { + rlc.evictedLimiter = *value.(**rate.Limiter) + } + + c, err := simplelru.NewLRU(size, simplelru.EvictCallback(onEvicted)) + if err != nil { + return nil, err + } + rlc.cache = c + return &rlc, nil +} + +func (rlc *rlCache) getRateLimiter(key string) *rate.Limiter { + // fetch the rate limiter from the cache, if a cache is given + if rlc.cache == nil || rlc.limit == -1 { + return nil + } + + // lock get and add action for cache to allow proper eviction handling without + // race conditions. + rlc.mu.Lock() + defer rlc.mu.Unlock() + + if l, ok := rlc.cache.Get(key); ok { + return *l.(**rate.Limiter) + } + + var limiter *rate.Limiter + if evicted := rlc.cache.Add(key, &limiter); evicted { + limiter = rlc.evictedLimiter + } else { + limiter = rate.NewLimiter(rate.Limit(rlc.limit), rlc.limit*burstMultiplier) + } + return limiter +} diff --git a/beater/rl_cache_test.go b/beater/rl_cache_test.go new file mode 100644 index 00000000000..886c7e95ae5 --- /dev/null +++ b/beater/rl_cache_test.go @@ -0,0 +1,73 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package beater + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestCacheInitFails(t *testing.T) { + for _, test := range []struct { + size int + limit int + }{ + {-1, 1}, + {0, 1}, + {1, -1}, + } { + c, err := NewRlCache(test.size, test.limit) + assert.Error(t, err) + assert.Nil(t, c) + } +} + +func TestCacheEviction(t *testing.T) { + cache_size := 2 + limit := 1 //multiplied times burstMultiplier 5 + + rlc, err := NewRlCache(cache_size, limit) + require.NoError(t, err) + + // add new limiter + rl_a := rlc.getRateLimiter("a") + rl_a.AllowN(time.Now(), 3) + + // add new limiter + rl_b := rlc.getRateLimiter("b") + rl_b.AllowN(time.Now(), 2) + + // reuse evicted limiter rl_a + rl_c := rlc.getRateLimiter("c") + assert.False(t, rl_c.Allow()) + assert.Equal(t, rl_c, rlc.evictedLimiter) + + // reuse evicted limiter rl_b + rl_d := rlc.getRateLimiter("a") + assert.True(t, rl_d.Allow()) + assert.False(t, rl_d.Allow()) + assert.Equal(t, rl_d, rlc.evictedLimiter) + // check that limiter are independent + assert.True(t, rl_d != rl_c) + rlc.evictedLimiter = nil + assert.NotNil(t, rl_d) + assert.NotNil(t, rl_c) +} diff --git a/beater/route_config.go b/beater/route_config.go index 6682e83deed..bdad837dce3 100644 --- a/beater/route_config.go +++ b/beater/route_config.go @@ -21,12 +21,11 @@ import ( "net/http" "regexp" - "github.com/elastic/apm-server/processor/stream" - "github.com/elastic/apm-server/processor" perr "github.com/elastic/apm-server/processor/error" "github.com/elastic/apm-server/processor/metric" "github.com/elastic/apm-server/processor/sourcemap" + "github.com/elastic/apm-server/processor/stream" "github.com/elastic/apm-server/processor/transaction" "github.com/elastic/apm-server/publish" @@ -79,16 +78,25 @@ var V1Routes = map[string]v1Route{ var V2Routes = map[string]v2Route{ V2BackendURL: v2BackendRoute, - V2RumURL: {rumRouteType}, + V2RumURL: v2RumRoute, } -var v2BackendRoute = v2Route{ - routeType{ - v2backendHandler, - systemMetadataDecoder, - func(*Config) transform.Config { return transform.Config{} }, - }, -} +var ( + v2BackendRoute = v2Route{ + routeType{ + v2backendHandler, + systemMetadataDecoder, + func(*Config) transform.Config { return transform.Config{} }, + }, + } + v2RumRoute = v2Route{ + routeType{ + v2rumHandler, + userMetaDataDecoder, + rumTransformConfig, + }, + } +) var ( backendRouteType = routeType{ @@ -127,6 +135,12 @@ func v2backendHandler(beaterConfig *Config, h http.Handler) http.Handler { authHandler(beaterConfig.SecretToken, h))) } +func v2rumHandler(beaterConfig *Config, h http.Handler) http.Handler { + return killSwitchHandler(beaterConfig.RumConfig.isEnabled(), + requestTimeHandler( + corsHandler(beaterConfig.RumConfig.AllowOrigins, h))) +} + func backendHandler(beaterConfig *Config, h http.Handler) http.Handler { return logHandler( requestTimeHandler( @@ -198,16 +212,24 @@ type v2Route struct { routeType } -func (v v2Route) Handler(beaterConfig *Config, report publish.Reporter) http.Handler { +func (v v2Route) Handler(url string, c *Config, report publish.Reporter) http.Handler { reqDecoder := v.configurableDecoder( - beaterConfig, + c, func(*http.Request) (map[string]interface{}, error) { return map[string]interface{}{}, nil }, ) v2Handler := v2Handler{ requestDecoder: reqDecoder, - streamProcessor: &stream.StreamProcessor{Tconfig: v.transformConfig(beaterConfig)}, + streamProcessor: &stream.StreamProcessor{Tconfig: v.transformConfig(c)}, + } + + if url == V2RumURL { + if rlc, err := NewRlCache(c.RumConfig.EventRate.LruSize, c.RumConfig.EventRate.Limit); err == nil { + v2Handler.rlc = rlc + } else { + logp.NewLogger("handler").Error(err.Error()) + } } - return v.wrappingHandler(beaterConfig, v2Handler.Handle(beaterConfig, report)) + return v.wrappingHandler(c, v2Handler.Handle(c, report)) } diff --git a/beater/v2_handler.go b/beater/v2_handler.go index b31683c4441..765a8b16933 100644 --- a/beater/v2_handler.go +++ b/beater/v2_handler.go @@ -25,6 +25,7 @@ import ( "github.com/elastic/apm-server/processor/stream" "github.com/elastic/apm-server/publish" + "github.com/elastic/apm-server/utility" "github.com/elastic/beats/libbeat/logp" @@ -34,6 +35,7 @@ import ( type v2Handler struct { requestDecoder decoder.ReqDecoder streamProcessor *stream.StreamProcessor + rlc *rlCache } func (v *v2Handler) statusCode(sr *stream.Result) (int, *monitoring.Int) { @@ -58,6 +60,9 @@ func (v *v2Handler) statusCode(sr *stream.Result) (int, *monitoring.Int) { case stream.ShuttingDownErrType: code = http.StatusServiceUnavailable ct = serverShuttingDownCounter + case stream.RateLimitErrType: + code = http.StatusTooManyRequests + ct = rateLimitCounter default: code = http.StatusInternalServerError ct = responseErrorsOthers @@ -111,12 +116,27 @@ func (v *v2Handler) Handle(beaterConfig *Config, report publish.Reporter) http.H return } + ctx := r.Context() + if v.rlc != nil { + rl := v.rlc.getRateLimiter(utility.RemoteAddr(r)) + if !rl.Allow() { + sr := stream.Result{} + sr.Add(&stream.Error{ + Type: stream.RateLimitErrType, + Message: "rate limit exceeded", + }) + v.sendResponse(logger, w, &sr) + return + } + ctx = stream.ContextWithRateLimiter(ctx, rl) + } + ndReader, err := decoder.NDJSONStreamDecodeCompressedWithLimit(r, beaterConfig.MaxEventSize) if err != nil { // if we can't set up the ndjsonreader, // we won't be able to make sense of the body sr := stream.Result{} - sr.LimitedAdd(&stream.Error{ + sr.Add(&stream.Error{ Type: stream.InvalidInputErrType, Message: err.Error(), }) @@ -127,12 +147,11 @@ func (v *v2Handler) Handle(beaterConfig *Config, report publish.Reporter) http.H reqMeta, err := v.requestDecoder(r) if err != nil { sr := stream.Result{} - sr.LimitedAdd(err) + sr.Add(err) v.sendResponse(logger, w, &sr) return } - - res := v.streamProcessor.HandleStream(r.Context(), reqMeta, ndReader, report) + res := v.streamProcessor.HandleStream(ctx, reqMeta, ndReader, report) v.sendResponse(logger, w, res) }) diff --git a/beater/v2_handler_test.go b/beater/v2_handler_test.go index 96faa1ebece..b3e09908ab1 100644 --- a/beater/v2_handler_test.go +++ b/beater/v2_handler_test.go @@ -25,10 +25,10 @@ import ( "path/filepath" "testing" - "github.com/elastic/beats/libbeat/monitoring" - "github.com/elastic/apm-server/publish" "github.com/elastic/apm-server/tests" + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/monitoring" "github.com/pkg/errors" "github.com/stretchr/testify/assert" @@ -44,7 +44,7 @@ func TestInvalidContentType(t *testing.T) { w := httptest.NewRecorder() c := defaultConfig("7.0.0") - handler := (&v2BackendRoute).Handler(c, nil) + handler := (&v2BackendRoute).Handler("", c, nil) handler.ServeHTTP(w, req) @@ -58,7 +58,7 @@ func TestEmptyRequest(t *testing.T) { w := httptest.NewRecorder() c := defaultConfig("7.0.0") - handler := (&v2BackendRoute).Handler(c, nil) + handler := (&v2BackendRoute).Handler("", c, nil) handler.ServeHTTP(w, req) @@ -84,7 +84,7 @@ func TestRequestDecoderError(t *testing.T) { }, } - handler := testRouteWithFaultyDecoder.Handler(c, nil) + handler := testRouteWithFaultyDecoder.Handler("", c, nil) handler.ServeHTTP(w, req) @@ -110,25 +110,16 @@ func TestRequestIntegration(t *testing.T) { {name: "FullQueue", code: http.StatusServiceUnavailable, path: "errors.ndjson", reportingErr: publish.ErrFull, counter: fullQueueCounter}, } { t.Run(test.name, func(t *testing.T) { - b, err := loader.LoadDataAsBytes(filepath.Join("../testdata/intake-v2/", test.path)) - require.NoError(t, err) - bodyReader := bytes.NewBuffer(b) - - req := httptest.NewRequest("POST", V2BackendURL, bodyReader) - req.Header.Add("Content-Type", "application/x-ndjson") - - w := httptest.NewRecorder() - - c := defaultConfig("7.0.0") - report := func(context.Context, publish.PendingReq) error { - return test.reportingErr - } - handler := (&v2BackendRoute).Handler(c, report) - ctSuccess := responseSuccesses.Get() ctFailure := responseErrors.Get() ct := test.counter.Get() - handler.ServeHTTP(w, req) + + w, err := sendReq(defaultConfig("7.0.0"), + &v2BackendRoute, + V2BackendURL, + filepath.Join("../testdata/intake-v2/", test.path), + test.reportingErr) + require.NoError(t, err) assert.Equal(t, test.code, w.Code, w.Body.String()) assert.Equal(t, ct+1, test.counter.Get()) @@ -138,7 +129,7 @@ func TestRequestIntegration(t *testing.T) { assert.Equal(t, ctSuccess+1, responseSuccesses.Get()) assert.Equal(t, ctFailure, responseErrors.Get()) } else { - assert.Equal(t, w.HeaderMap.Get("Content-Type"), "application/json") + assert.Equal(t, "application/json", w.HeaderMap.Get("Content-Type")) assert.Equal(t, ctSuccess, responseSuccesses.Get()) assert.Equal(t, ctFailure+1, responseErrors.Get()) @@ -149,11 +140,56 @@ func TestRequestIntegration(t *testing.T) { } } +func TestRequestIntegrationRUM(t *testing.T) { + for _, test := range []struct { + name string + code int + path string + }{ + {name: "Success", code: http.StatusAccepted, path: "../testdata/intake-v2/errors.ndjson"}, + {name: "RateLimit", code: http.StatusTooManyRequests, path: "../testdata/intake-v2/heavy.ndjson"}, + } { + t.Run(test.name, func(t *testing.T) { + + ucfg, err := common.NewConfigFrom(m{"rum": m{"enabled": true, "event_rate": m{"limit": 9}}}) + require.NoError(t, err) + c, err := NewConfig("7.0.0", ucfg) + require.NoError(t, err) + w, err := sendReq(c, &v2RumRoute, V2RumURL, test.path, nil) + require.NoError(t, err) + + require.Equal(t, test.code, w.Code, w.Body.String()) + if test.code != http.StatusAccepted { + body := w.Body.Bytes() + tests.AssertApproveResult(t, "approved-stream-result/TestRequestIntegrationRum"+test.name, body) + } + }) + } +} + +func sendReq(c *Config, route *v2Route, url string, p string, repErr error) (*httptest.ResponseRecorder, error) { + b, err := loader.LoadDataAsBytes(p) + if err != nil { + return nil, err + } + req := httptest.NewRequest("POST", url, bytes.NewBuffer(b)) + req.Header.Add("Content-Type", "application/x-ndjson") + + report := func(context.Context, publish.PendingReq) error { + return repErr + } + handler := route.Handler(url, c, report) + + w := httptest.NewRecorder() + handler.ServeHTTP(w, req) + return w, nil +} + func TestV2WrongMethod(t *testing.T) { req := httptest.NewRequest("GET", "/intake/v2/events", nil) req.Header.Add("Content-Type", "application/x-ndjson") w := httptest.NewRecorder() - handler := (&v2BackendRoute).Handler(defaultConfig("7.0.0"), nil) + handler := (&v2BackendRoute).Handler("", defaultConfig("7.0.0"), nil) ct := methodNotAllowedCounter.Get() handler.ServeHTTP(w, req) @@ -186,7 +222,7 @@ func TestV2LineExceeded(t *testing.T) { } c := defaultConfig("7.0.0") assert.False(t, lineLimitExceededInTestData(c.MaxEventSize)) - handler := (&v2BackendRoute).Handler(c, report) + handler := (&v2BackendRoute).Handler("", c, report) handler.ServeHTTP(w, req) assert.Equal(t, http.StatusAccepted, w.Code, w.Body.String()) @@ -194,7 +230,7 @@ func TestV2LineExceeded(t *testing.T) { c.MaxEventSize = 20 assert.True(t, lineLimitExceededInTestData(c.MaxEventSize)) - handler = (&v2BackendRoute).Handler(c, report) + handler = (&v2BackendRoute).Handler("", c, report) req = httptest.NewRequest("POST", "/v2/intake", bytes.NewBuffer(b)) req.Header.Add("Content-Type", "application/x-ndjson") diff --git a/processor/sourcemap/package_tests/attrs_test.go b/processor/sourcemap/package_tests/attrs_test.go index a0bd2f479f1..51d70da9c20 100644 --- a/processor/sourcemap/package_tests/attrs_test.go +++ b/processor/sourcemap/package_tests/attrs_test.go @@ -56,6 +56,7 @@ func TestKeywordLimitationOnSourcemapAttributes(t *testing.T) { "sourcemap.service.version": "service_version", "sourcemap.bundle_filepath": "bundle_filepath", } + procSetup.KeywordLimitation(t, tests.NewSet(), mapping) } diff --git a/processor/stream/approved-es-documents/testV2IntakeIntegrationMixedMinimalProcess.approved.json b/processor/stream/approved-es-documents/testV2IntakeIntegrationEvents.approved.json similarity index 100% rename from processor/stream/approved-es-documents/testV2IntakeIntegrationMixedMinimalProcess.approved.json rename to processor/stream/approved-es-documents/testV2IntakeIntegrationEvents.approved.json diff --git a/processor/stream/approved-stream-result/testIntegrationResultMixedMinimalProcess.approved.json b/processor/stream/approved-stream-result/testIntegrationResultEvents.approved.json similarity index 100% rename from processor/stream/approved-stream-result/testIntegrationResultMixedMinimalProcess.approved.json rename to processor/stream/approved-stream-result/testIntegrationResultEvents.approved.json diff --git a/processor/stream/approved-stream-result/testIntegrationResultLimiterAllowAll.approved.json b/processor/stream/approved-stream-result/testIntegrationResultLimiterAllowAll.approved.json new file mode 100644 index 00000000000..c612a4faf20 --- /dev/null +++ b/processor/stream/approved-stream-result/testIntegrationResultLimiterAllowAll.approved.json @@ -0,0 +1,3 @@ +{ + "accepted": 19 +} diff --git a/processor/stream/approved-stream-result/testIntegrationResultLimiterDeny.approved.json b/processor/stream/approved-stream-result/testIntegrationResultLimiterDeny.approved.json new file mode 100644 index 00000000000..8333c5a198a --- /dev/null +++ b/processor/stream/approved-stream-result/testIntegrationResultLimiterDeny.approved.json @@ -0,0 +1,8 @@ +{ + "accepted": 10, + "errors": [ + { + "message": "rate limit exceeded" + } + ] +} diff --git a/processor/stream/approved-stream-result/testIntegrationResultLimiterDenyAll.approved.json b/processor/stream/approved-stream-result/testIntegrationResultLimiterDenyAll.approved.json new file mode 100644 index 00000000000..507b15773aa --- /dev/null +++ b/processor/stream/approved-stream-result/testIntegrationResultLimiterDenyAll.approved.json @@ -0,0 +1,8 @@ +{ + "accepted": 0, + "errors": [ + { + "message": "rate limit exceeded" + } + ] +} diff --git a/processor/stream/approved-stream-result/testIntegrationResultLimiterPartiallyUsedLimitAllow.approved.json b/processor/stream/approved-stream-result/testIntegrationResultLimiterPartiallyUsedLimitAllow.approved.json new file mode 100644 index 00000000000..c612a4faf20 --- /dev/null +++ b/processor/stream/approved-stream-result/testIntegrationResultLimiterPartiallyUsedLimitAllow.approved.json @@ -0,0 +1,3 @@ +{ + "accepted": 19 +} diff --git a/processor/stream/approved-stream-result/testIntegrationResultLimiterPartiallyUsedLimitDeny.approved.json b/processor/stream/approved-stream-result/testIntegrationResultLimiterPartiallyUsedLimitDeny.approved.json new file mode 100644 index 00000000000..8333c5a198a --- /dev/null +++ b/processor/stream/approved-stream-result/testIntegrationResultLimiterPartiallyUsedLimitDeny.approved.json @@ -0,0 +1,8 @@ +{ + "accepted": 10, + "errors": [ + { + "message": "rate limit exceeded" + } + ] +} diff --git a/processor/stream/approved-stream-result/testIntegrationResultNoLimiter.approved.json b/processor/stream/approved-stream-result/testIntegrationResultNoLimiter.approved.json new file mode 100644 index 00000000000..c612a4faf20 --- /dev/null +++ b/processor/stream/approved-stream-result/testIntegrationResultNoLimiter.approved.json @@ -0,0 +1,3 @@ +{ + "accepted": 19 +} diff --git a/processor/stream/benchmark_test.go b/processor/stream/benchmark_test.go index 56b669a9974..d2a3a4f2b5b 100644 --- a/processor/stream/benchmark_test.go +++ b/processor/stream/benchmark_test.go @@ -22,10 +22,13 @@ import ( "context" "errors" "io/ioutil" + "math" "path/filepath" "runtime" "testing" + r "golang.org/x/time/rate" + "github.com/elastic/apm-server/decoder" "github.com/elastic/apm-server/publish" "github.com/elastic/apm-server/tests/loader" @@ -44,7 +47,8 @@ func BenchmarkStreamProcessor(b *testing.B) { if err != nil { b.Error(err) } - ctx := context.Background() + //ensure to not hit rate limit as blocking wait would be measured otherwise + ctx := ContextWithRateLimiter(context.Background(), r.NewLimiter(r.Limit(math.MaxFloat64-1), math.MaxInt32)) sp := &StreamProcessor{} for _, f := range files { b.Run(f.Name(), func(b *testing.B) { diff --git a/processor/stream/result.go b/processor/stream/result.go index e6abc34c3bf..7245ad63371 100644 --- a/processor/stream/result.go +++ b/processor/stream/result.go @@ -46,6 +46,7 @@ const ( ShuttingDownErrType ServerErrType MethodForbiddenErrType + RateLimitErrType ) const ( diff --git a/processor/stream/stream_processor.go b/processor/stream/stream_processor.go index ec795f2c13c..de02567f507 100644 --- a/processor/stream/stream_processor.go +++ b/processor/stream/stream_processor.go @@ -21,9 +21,12 @@ import ( "context" "errors" "io" + "time" "github.com/santhosh-tekuri/jsonschema" + "golang.org/x/time/rate" + "github.com/elastic/apm-server/decoder" er "github.com/elastic/apm-server/model/error" "github.com/elastic/apm-server/model/metadata" @@ -36,6 +39,19 @@ import ( "github.com/elastic/apm-server/validation" ) +type rateLimiterKey struct{} + +func ContextWithRateLimiter(ctx context.Context, limiter *rate.Limiter) context.Context { + return context.WithValue(ctx, rateLimiterKey{}, limiter) +} + +func rateLimiterFromContext(ctx context.Context) *rate.Limiter { + if lim, ok := ctx.Value(rateLimiterKey{}).(*rate.Limiter); ok { + return lim + } + return nil +} + var ( ErrUnrecognizedObject = errors.New("did not recognize object type") ) @@ -174,12 +190,29 @@ func (v *StreamProcessor) handleRawModel(rawModel map[string]interface{}) (trans // readBatch will read up to `batchSize` objects from the ndjson stream // it returns a slice of eventables and a bool that indicates if there might be more to read. -func (s *StreamProcessor) readBatch(batchSize int, reader StreamReader, response *Result) ([]transform.Transformable, bool) { - var err error - var rawModel map[string]interface{} +func (s *StreamProcessor) readBatch(ctx context.Context, rl *rate.Limiter, batchSize int, reader StreamReader, response *Result) ([]transform.Transformable, bool) { + var ( + err error + rawModel map[string]interface{} + eventables []transform.Transformable + ) + + if rl != nil { + // use provided rate limiter to throttle batch read + ctxT, cancel := context.WithTimeout(ctx, time.Second) + err = rl.WaitN(ctxT, batchSize) + cancel() + if err != nil { + response.Add(&Error{ + Type: RateLimitErrType, + Message: "rate limit exceeded", + }) + return eventables, true + } + } - var eventables []transform.Transformable for i := 0; i < batchSize && err == nil; i++ { + rawModel, err = reader.Read() if err != nil && err != io.EOF { @@ -227,9 +260,11 @@ func (s *StreamProcessor) HandleStream(ctx context.Context, meta map[string]inte Config: s.Tconfig, Metadata: *metadata, } + rl := rateLimiterFromContext(ctx) for { - transformables, done := s.readBatch(batchSize, jsonReader, res) + + transformables, done := s.readBatch(ctx, rl, batchSize, jsonReader, res) if transformables != nil { err := report(ctx, publish.PendingReq{ Transformables: transformables, diff --git a/processor/stream/stream_processor_test.go b/processor/stream/stream_processor_test.go index bf4802acfbb..eb4f6f0887d 100644 --- a/processor/stream/stream_processor_test.go +++ b/processor/stream/stream_processor_test.go @@ -27,6 +27,8 @@ import ( "testing/iotest" "time" + "golang.org/x/time/rate" + "github.com/elastic/apm-server/decoder" "github.com/elastic/apm-server/tests" "github.com/elastic/apm-server/tests/loader" @@ -122,9 +124,9 @@ func TestIntegration(t *testing.T) { {path: "transactions.ndjson", name: "Transactions"}, {path: "spans.ndjson", name: "Spans"}, {path: "metrics.ndjson", name: "Metrics"}, - {path: "minimal_process.ndjson", name: "MixedMinimalProcess"}, - {path: "minimal_service.ndjson", name: "MinimalService"}, - {path: "metadata_null_values.ndjson", name: "MetadataNullValues"}, + {path: "events.ndjson", name: "Events"}, + {path: "minimal-service.ndjson", name: "MinimalService"}, + {path: "metadata-null-values.ndjson", name: "MetadataNullValues"}, {path: "invalid-event.ndjson", name: "InvalidEvent"}, {path: "invalid-json-event.ndjson", name: "InvalidJSONEvent"}, {path: "invalid-json-metadata.ndjson", name: "InvalidJSONMetadata"}, @@ -156,3 +158,34 @@ func TestIntegration(t *testing.T) { }) } } + +func TestRateLimiting(t *testing.T) { + report := func(ctx context.Context, p publish.PendingReq) error { + for range p.Transformables { + } + return nil + } + + b, err := loader.LoadDataAsBytes("../testdata/intake-v2/ratelimit.ndjson") + require.NoError(t, err) + for _, test := range []struct { + name string + lim *rate.Limiter + hit int + }{ + {name: "NoLimiter"}, + {name: "LimiterDenyAll", lim: rate.NewLimiter(rate.Limit(0), 2)}, + {name: "LimiterAllowAll", lim: rate.NewLimiter(rate.Limit(40), 40*5)}, + {name: "LimiterPartiallyUsedLimitAllow", lim: rate.NewLimiter(rate.Limit(10), 10*2), hit: 10}, + {name: "LimiterPartiallyUsedLimitDeny", lim: rate.NewLimiter(rate.Limit(7), 7*2), hit: 10}, + {name: "LimiterDeny", lim: rate.NewLimiter(rate.Limit(6), 6*2)}, + } { + reader := decoder.NewNDJSONStreamReader(bytes.NewReader(b), 100*1024) + if test.hit > 0 { + assert.True(t, test.lim.AllowN(time.Now(), test.hit)) + } + ctx := ContextWithRateLimiter(context.Background(), test.lim) + actualResult := (&StreamProcessor{}).HandleStream(ctx, map[string]interface{}{}, reader, report) + assertApproveResult(t, actualResult, test.name) + } +} diff --git a/testdata/intake-v2/minimal_process.ndjson b/testdata/intake-v2/events.ndjson similarity index 100% rename from testdata/intake-v2/minimal_process.ndjson rename to testdata/intake-v2/events.ndjson diff --git a/testdata/intake-v2/metadata_null_values.ndjson b/testdata/intake-v2/metadata-null-values.ndjson similarity index 100% rename from testdata/intake-v2/metadata_null_values.ndjson rename to testdata/intake-v2/metadata-null-values.ndjson diff --git a/testdata/intake-v2/minimal_service.ndjson b/testdata/intake-v2/minimal-service.ndjson similarity index 100% rename from testdata/intake-v2/minimal_service.ndjson rename to testdata/intake-v2/minimal-service.ndjson diff --git a/testdata/intake-v2/only-metadata.ndjson b/testdata/intake-v2/only-metadata.ndjson new file mode 100644 index 00000000000..5714fb45eed --- /dev/null +++ b/testdata/intake-v2/only-metadata.ndjson @@ -0,0 +1 @@ +{"metadata": {"process": {"ppid": 6789, "pid": 1234, "argv": ["node", "server.js"], "title": "node"}, "system": {"platform": "darwin", "hostname": "prod1.example.com", "architecture": "x64"}, "service": {"name": "1234_service-12a3", "language": {"version": "8", "name": "ecmascript"}, "agent": {"version": "3.14.0", "name": "elastic-node"}, "environment": "staging", "framework": {"version": "1.2.3", "name": "Express"}, "version": "5.1.3", "runtime": {"version": "8.0.0", "name": "node"}}}} diff --git a/testdata/intake-v2/ratelimit.ndjson b/testdata/intake-v2/ratelimit.ndjson new file mode 100644 index 00000000000..8d48399ed13 --- /dev/null +++ b/testdata/intake-v2/ratelimit.ndjson @@ -0,0 +1,20 @@ +{"metadata": { "process": {"pid": 1234 }, "service": {"name": "1234_service-12a3", "language": {"name": "ecmascript"}, "agent": {"version": "3.14.0", "name": "elastic-node"}}}} +{ "error": {"id": "abcdef0123456789", "timestamp": "2018-08-09T15:04:05.999Z","log": {"level": "custom log level","message": "Cannot read property 'baz' of undefined"}}} +{ "span": { "id": "0123456a89012345", "trace_id": "0123456789abcdef0123456789abcdef", "parent_id": "ab23456a89012345", "transaction_id": "ab23456a89012345", "parent": 1, "name": "GET /api/types", "type": "request", "start": 1.845, "duration": 3.5642981, "stacktrace": [], "context": {} }} +{ "transaction": { "trace_id": "01234567890123456789abcdefabcdef", "id": "abcdef1478523690", "type": "request", "duration": 32.592981, "timestamp": "2018-08-30T18:53:27.154Z", "result": "200", "context": null, "spans": null, "sampled": null, "span_count": { "started": 0 }}} +{ "metricset": { "samples": { "go.memstats.heap.sys.bytes": { "value": 61235 } }, "timestamp": "2017-05-30T18:53:42.281Z" }} +{ "error": {"id": "abcdef0123456789", "timestamp": "2018-08-09T15:04:05.999Z","log": {"level": "custom log level","message": "Cannot read property 'baz' of undefined"}}} +{ "span": { "id": "0123456a89012345", "trace_id": "0123456789abcdef0123456789abcdef", "parent_id": "ab23456a89012345", "transaction_id": "ab23456a89012345", "parent": 1, "name": "GET /api/types", "type": "request", "start": 1.845, "duration": 3.5642981, "stacktrace": [], "context": {} }} +{ "transaction": { "trace_id": "01234567890123456789abcdefabcdef", "id": "abcdef1478523690", "type": "request", "duration": 32.592981, "timestamp": "2018-08-30T18:53:27.154Z", "result": "200", "context": null, "spans": null, "sampled": null, "span_count": { "started": 0 }}} +{ "metricset": { "samples": { "go.memstats.heap.sys.bytes": { "value": 61235 } }, "timestamp": "2017-05-30T18:53:42.281Z" }} +{ "error": {"id": "abcdef0123456789", "timestamp": "2018-08-09T15:04:05.999Z","log": {"level": "custom log level","message": "Cannot read property 'baz' of undefined"}}} +{ "span": { "id": "0123456a89012345", "trace_id": "0123456789abcdef0123456789abcdef", "parent_id": "ab23456a89012345", "transaction_id": "ab23456a89012345", "parent": 1, "name": "GET /api/types", "type": "request", "start": 1.845, "duration": 3.5642981, "stacktrace": [], "context": {} }} +{ "transaction": { "trace_id": "01234567890123456789abcdefabcdef", "id": "abcdef1478523690", "type": "request", "duration": 32.592981, "timestamp": "2018-08-30T18:53:27.154Z", "result": "200", "context": null, "spans": null, "sampled": null, "span_count": { "started": 0 }}} +{ "metricset": { "samples": { "go.memstats.heap.sys.bytes": { "value": 61235 } }, "timestamp": "2017-05-30T18:53:42.281Z" }} +{ "error": {"id": "abcdef0123456789", "timestamp": "2018-08-09T15:04:05.999Z","log": {"level": "custom log level","message": "Cannot read property 'baz' of undefined"}}} +{ "span": { "id": "0123456a89012345", "trace_id": "0123456789abcdef0123456789abcdef", "parent_id": "ab23456a89012345", "transaction_id": "ab23456a89012345", "parent": 1, "name": "GET /api/types", "type": "request", "start": 1.845, "duration": 3.5642981, "stacktrace": [], "context": {} }} +{ "transaction": { "trace_id": "01234567890123456789abcdefabcdef", "id": "abcdef1478523690", "type": "request", "duration": 32.592981, "timestamp": "2018-08-30T18:53:27.154Z", "result": "200", "context": null, "spans": null, "sampled": null, "span_count": { "started": 0 }}} +{ "metricset": { "samples": { "go.memstats.heap.sys.bytes": { "value": 61235 } }, "timestamp": "2017-05-30T18:53:42.281Z" }} +{ "error": {"id": "abcdef0123456789", "timestamp": "2018-08-09T15:04:05.999Z","log": {"level": "custom log level","message": "Cannot read property 'baz' of undefined"}}} +{ "span": { "id": "0123456a89012345", "trace_id": "0123456789abcdef0123456789abcdef", "parent_id": "ab23456a89012345", "transaction_id": "ab23456a89012345", "parent": 1, "name": "GET /api/types", "type": "request", "start": 1.845, "duration": 3.5642981, "stacktrace": [], "context": {} }} +{ "transaction": { "trace_id": "01234567890123456789abcdefabcdef", "id": "abcdef1478523690", "type": "request", "duration": 32.592981, "timestamp": "2018-08-30T18:53:27.154Z", "result": "200", "context": null, "spans": null, "sampled": null, "span_count": { "started": 0 }}} diff --git a/tests/system/apmserver.py b/tests/system/apmserver.py index 511450a6e92..d1609aca130 100644 --- a/tests/system/apmserver.py +++ b/tests/system/apmserver.py @@ -51,11 +51,11 @@ def get_transaction_payload_path(self, name="payload.json"): def get_metrics_payload_path(self, name="payload.json"): return self.get_payload_path("metric", name) - def get_transaction_v2_payload(self): - with open(self.get_transaction_v2_payload_path()) as f: + def get_event_v2_payload(self, name="events.ndjson"): + with open(self.get_event_v2_payload_path(name=name)) as f: return f.read() - def get_transaction_v2_payload_path(self, name="transactions.ndjson"): + def get_event_v2_payload_path(self, name="events.ndjson"): return self._beat_path_join( 'testdata', 'intake-v2', @@ -276,6 +276,8 @@ class ClientSideBaseTest(ServerBaseTest): errors_url = 'http://localhost:8200/v1/rum/errors' sourcemap_url = 'http://localhost:8200/assets/v1/sourcemaps' + intake_v2_url = 'http://localhost:8200/intake/v2/rum/events' + @classmethod def setUpClass(cls): super(ClientSideBaseTest, cls).setUpClass() diff --git a/tests/system/config/apm-server.yml.j2 b/tests/system/config/apm-server.yml.j2 index 0a0a44a53ed..4725064589a 100644 --- a/tests/system/config/apm-server.yml.j2 +++ b/tests/system/config/apm-server.yml.j2 @@ -13,6 +13,7 @@ apm-server: rum.allow_origins: {{ allow_origins }} rum.library_pattern: "~/test|library" rum.exclude_from_grouping: "~/test" + rum.event_rate.limit: 16 {% if smap_cache_expiration%} rum.source_mapping.cache.expiration: {{ smap_cache_expiration}} diff --git a/tests/system/test_access.py b/tests/system/test_access.py index 273f4f221ab..792463abc41 100644 --- a/tests/system/test_access.py +++ b/tests/system/test_access.py @@ -39,7 +39,7 @@ def test_with_token_v2(self): """ url = 'http://localhost:8200/intake/v2/events' - transactions = self.get_transaction_v2_payload() + transactions = self.get_event_v2_payload(name="transactions.ndjson") headers = {'content-type': 'application/x-ndjson'} def oauth(v): diff --git a/tests/system/test_requests.py b/tests/system/test_requests.py index a65cb2b298d..95bd33938ef 100644 --- a/tests/system/test_requests.py +++ b/tests/system/test_requests.py @@ -218,7 +218,6 @@ def test_preflight_bad_headers(self): class RateLimitTest(ClientSideBaseTest): def test_rate_limit(self): - transactions = self.get_transaction_payload() threads = [] codes = defaultdict(int) @@ -244,7 +243,6 @@ def fire(): assert fire() == 202 def test_rate_limit_multiple_ips(self): - transactions = self.get_transaction_payload() threads = [] codes = defaultdict(int) @@ -272,3 +270,77 @@ def fire(x): time.sleep(1) assert fire(0) == 202 assert fire(1) == 202 + + +class RateLimitV2Test(ClientSideBaseTest): + + def fire_events(self, data_file, iterations, split_ips=False): + transactions = self.get_event_v2_payload(name=data_file) + headers = {'content-type': 'application/x-ndjson'} + threads = [] + codes = defaultdict(int) + + def fire(x): + ip = '10.11.12.13' + if split_ips and x % 2: + ip = '10.11.12.14' + r = requests.post(self.intake_v2_url, + data=transactions, + headers={'content-type': 'application/x-ndjson', + 'X-Forwarded-For': ip}) + codes[r.status_code] += 1 + return r.status_code + + # rate limit hit, because every event in request is counted + for x in range(iterations): + threads.append(threading.Thread(target=fire, args=(x,))) + + for t in threads: + t.start() + time.sleep(0.01) + + for t in threads: + t.join() + return codes + + # limit: 16, burst_multiplier: 3, burst: 48 + def test_rate_limit(self): + # all requests from the same ip + # 19 events, batch size 10 => 20+1 events per requ + codes = self.fire_events("ratelimit.ndjson", 3) + assert set(codes.keys()) == set([202]), codes + + def test_rate_limit_hit(self): + # all requests from the same ip + codes = self.fire_events("ratelimit.ndjson", 5) + assert set(codes.keys()) == set([202, 429]), codes + assert codes[429] == 2, codes + assert codes[202] == 3, codes + + def test_rate_limit_small_hit(self): + # all requests from the same ip + # 4 events, batch size 10 => 10+1 events per requ + codes = self.fire_events("events.ndjson", 8) + assert set(codes.keys()) == set([202, 429]), codes + assert codes[429] == 3, codes + assert codes[202] == 5, codes + + def test_rate_limit_only_metadata(self): + # all requests from the same ip + # no events, batch size 10 => 10+1 events per requ + codes = self.fire_events("only-metadata.ndjson", 8) + assert set(codes.keys()) == set([202, 429]), codes + assert codes[429] == 3, codes + assert codes[202] == 5, codes + + def test_multiple_ips_rate_limit(self): + # requests from 2 different ips + codes = self.fire_events("ratelimit.ndjson", 6, True) + assert set(codes.keys()) == set([202]), codes + + def test_multiple_ips_rate_limit_hit(self): + # requests from 2 different ips + codes = self.fire_events("ratelimit.ndjson", 10, True) + assert set(codes.keys()) == set([202, 429]), codes + assert codes[429] == 4, codes + assert codes[202] == 6, codes