diff --git a/_meta/beat.yml b/_meta/beat.yml index dbbd823f720..1fcf360535a 100644 --- a/_meta/beat.yml +++ b/_meta/beat.yml @@ -52,6 +52,11 @@ apm-server: # Rate limit per second and IP address for requests sent to the RUM endpoint. #rate_limit: 10 + # Settings concerning only requests sent to `/v2/rum/intake` endpoint: + # Rate limit per second and IP address for amount of events sent to the RUM endpoint. + # Defaults to 5000. + #event_rate_limit: 5000 + # Comma separated list of permitted origins for real user monitoring. # User-agents will send an origin header that will be validated against this list. # An origin is made of a protocol scheme, host and port, without the url path. diff --git a/apm-server.yml b/apm-server.yml index 40cf38bf55c..d87e4e68ade 100644 --- a/apm-server.yml +++ b/apm-server.yml @@ -52,6 +52,11 @@ apm-server: # Rate limit per second and IP address for requests sent to the RUM endpoint. #rate_limit: 10 + # Settings concerning only requests sent to `/v2/rum/intake` endpoint: + # Rate limit per second and IP address for amount of events sent to the RUM endpoint. + # Defaults to 5000. + #event_rate_limit: 5000 + # Comma separated list of permitted origins for real user monitoring. # User-agents will send an origin header that will be validated against this list. # An origin is made of a protocol scheme, host and port, without the url path. diff --git a/beater/beater.go b/beater/beater.go index 370420bb8a3..01387fe5a02 100644 --- a/beater/beater.go +++ b/beater/beater.go @@ -18,16 +18,13 @@ package beater import ( - "fmt" "net" "net/http" "net/url" "os" - "regexp" "sync" "time" - "github.com/pkg/errors" "golang.org/x/sync/errgroup" "github.com/elastic/apm-agent-go" @@ -52,18 +49,11 @@ type beater struct { // Creates beater func New(b *beat.Beat, ucfg *common.Config) (beat.Beater, error) { logger := logp.NewLogger("beater") - beaterConfig := defaultConfig(b.Info.Version) - if err := ucfg.Unpack(beaterConfig); err != nil { - return nil, errors.Wrap(err, "Error processing configuration") + beaterConfig, err := NewConfig(b.Info.Version, ucfg) + if err != nil { + return nil, err } - beaterConfig.SetRumConfig() if beaterConfig.RumConfig.isEnabled() { - if _, err := regexp.Compile(beaterConfig.RumConfig.LibraryPattern); err != nil { - return nil, errors.New(fmt.Sprintf("Invalid regex for `library_pattern`: %v", err.Error())) - } - if _, err := regexp.Compile(beaterConfig.RumConfig.ExcludeFromGrouping); err != nil { - return nil, errors.New(fmt.Sprintf("Invalid regex for `exclude_from_grouping`: %v", err.Error())) - } if b.Config != nil && beaterConfig.RumConfig.SourceMapping.EsConfig == nil { // fall back to elasticsearch output configuration for sourcemap storage if possible if isElasticsearchOutput(b) { diff --git a/beater/beater_test.go b/beater/beater_test.go index 8b186ec44a3..dad91fab6e7 100644 --- a/beater/beater_test.go +++ b/beater/beater_test.go @@ -37,6 +37,7 @@ import ( "github.com/elastic/apm-agent-go" "github.com/elastic/apm-server/publish" + "github.com/elastic/apm-server/rate" "github.com/elastic/apm-server/tests/loader" "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" @@ -50,6 +51,10 @@ import ( func TestBeatConfig(t *testing.T) { falsy, truthy := false, true + dc := defaultConfig("6.2.0") + dc.rateLimitHandler = &rate.NilLimitHandler{} + dc.RumConfig.rateLimitHandler = rate.NewLimitHandler(dc.RumConfig.EventRateLimit) + dc.FrontendConfig.rateLimitHandler = rate.NewLimitHandler(dc.RumConfig.EventRateLimit) tests := []struct { conf map[string]interface{} @@ -59,7 +64,7 @@ func TestBeatConfig(t *testing.T) { }{ { conf: map[string]interface{}{}, - beaterConf: defaultConfig("6.2.0"), + beaterConf: dc, msg: "Default config created for empty config.", }, { @@ -84,9 +89,10 @@ func TestBeatConfig(t *testing.T) { "url": "/debug/vars", }, "frontend": map[string]interface{}{ - "enabled": true, - "rate_limit": 1000, - "allow_origins": []string{"example*"}, + "enabled": true, + "rate_limit": 1000, + "event_rate_limit": 7200, + "allow_origins": []string{"example*"}, "source_mapping": map[string]interface{}{ "cache": map[string]interface{}{ "expiration": 8 * time.Minute, @@ -125,9 +131,10 @@ func TestBeatConfig(t *testing.T) { Url: "/debug/vars", }, FrontendConfig: &rumConfig{ - Enabled: &truthy, - RateLimit: 1000, - AllowOrigins: []string{"example*"}, + Enabled: &truthy, + RateLimit: 1000, + EventRateLimit: 7200, + AllowOrigins: []string{"example*"}, SourceMapping: &SourceMapping{ Cache: &Cache{Expiration: 8 * time.Minute}, IndexPattern: "apm-test*", @@ -135,11 +142,13 @@ func TestBeatConfig(t *testing.T) { LibraryPattern: "^custom", ExcludeFromGrouping: "^grouping", beatVersion: "6.2.0", + rateLimitHandler: rate.NewLimitHandler(7200), }, RumConfig: &rumConfig{ - Enabled: &truthy, - RateLimit: 1000, - AllowOrigins: []string{"example*"}, + Enabled: &truthy, + RateLimit: 1000, + EventRateLimit: 7200, + AllowOrigins: []string{"example*"}, SourceMapping: &SourceMapping{ Cache: &Cache{Expiration: 8 * time.Minute}, IndexPattern: "apm-test*", @@ -147,6 +156,7 @@ func TestBeatConfig(t *testing.T) { LibraryPattern: "^custom", ExcludeFromGrouping: "^grouping", beatVersion: "6.2.0", + rateLimitHandler: rate.NewLimitHandler(7200), }, Metrics: &metricsConfig{ Enabled: &falsy, @@ -161,6 +171,7 @@ func TestBeatConfig(t *testing.T) { }, }, }, + rateLimitHandler: &rate.NilLimitHandler{}, }, msg: "Given config overwrites default", }, @@ -186,7 +197,8 @@ func TestBeatConfig(t *testing.T) { }, }, "rum": map[string]interface{}{ - "enabled": true, + "enabled": true, + "event_rate_limit": 7200, "source_mapping": map[string]interface{}{ "cache": map[string]interface{}{ "expiration": 7, @@ -218,8 +230,9 @@ func TestBeatConfig(t *testing.T) { Url: "/debug/vars", }, FrontendConfig: &rumConfig{ - Enabled: &truthy, - RateLimit: 890, + Enabled: &truthy, + RateLimit: 890, + EventRateLimit: 5000, SourceMapping: &SourceMapping{ Cache: &Cache{ Expiration: 4 * time.Second, @@ -232,9 +245,10 @@ func TestBeatConfig(t *testing.T) { beatVersion: "6.2.0", }, RumConfig: &rumConfig{ - Enabled: &truthy, - RateLimit: 10, - AllowOrigins: []string{"*"}, + Enabled: &truthy, + RateLimit: 10, + EventRateLimit: 7200, + AllowOrigins: []string{"*"}, SourceMapping: &SourceMapping{ Cache: &Cache{ Expiration: 7 * time.Second, @@ -244,6 +258,7 @@ func TestBeatConfig(t *testing.T) { LibraryPattern: "rum", ExcludeFromGrouping: "^/webpack", beatVersion: "6.2.0", + rateLimitHandler: rate.NewLimitHandler(7200), }, Metrics: &metricsConfig{ Enabled: &truthy, @@ -258,6 +273,7 @@ func TestBeatConfig(t *testing.T) { }, }, }, + rateLimitHandler: &rate.NilLimitHandler{}, }, msg: "Given config merged with default", }, diff --git a/beater/common_handlers.go b/beater/common_handlers.go index 90aaa3a5047..d2014af0a9a 100644 --- a/beater/common_handlers.go +++ b/beater/common_handlers.go @@ -27,15 +27,14 @@ import ( "strings" "time" - "github.com/hashicorp/golang-lru" "github.com/pkg/errors" "github.com/ryanuber/go-glob" "github.com/satori/go.uuid" - "golang.org/x/time/rate" "github.com/elastic/apm-server/decoder" "github.com/elastic/apm-server/processor" "github.com/elastic/apm-server/publish" + "github.com/elastic/apm-server/rate" "github.com/elastic/apm-server/transform" "github.com/elastic/apm-server/utility" @@ -46,9 +45,6 @@ import ( ) const ( - rateLimitCacheSize = 1000 - rateLimitBurstMultiplier = 2 - supportedHeaders = "Content-Type, Content-Encoding, Accept" supportedMethods = "POST, OPTIONS" ) @@ -161,7 +157,7 @@ func newMuxer(beaterConfig *Config, report publish.Reporter) *http.ServeMux { for path, route := range V2Routes { logger.Infof("Path %s added to request handler", path) - mux.Handle(path, route.Handler(beaterConfig, report)) + mux.Handle(path, route.Handler(route.rateLimitHandler(beaterConfig), beaterConfig, report)) } mux.Handle(rootURL, rootHandler(beaterConfig.SecretToken)) @@ -294,19 +290,9 @@ func killSwitchHandler(killSwitch bool, h http.Handler) http.Handler { }) } -func ipRateLimitHandler(rateLimit int, h http.Handler) http.Handler { - cache, _ := lru.New(rateLimitCacheSize) - - var deny = func(ip string) bool { - if !cache.Contains(ip) { - cache.Add(ip, rate.NewLimiter(rate.Limit(rateLimit), rateLimit*rateLimitBurstMultiplier)) - } - var limiter, _ = cache.Get(ip) - return !limiter.(*rate.Limiter).Allow() - } - +func ipRateLimitHandler(rh rate.LimitHandler, h http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - if deny(utility.RemoteAddr(r)) { + if !rh.Allow(utility.RemoteAddr(r)) { sendStatus(w, r, rateLimitedResponse) return } diff --git a/beater/config.go b/beater/config.go index d6209b09b9e..180677ee988 100644 --- a/beater/config.go +++ b/beater/config.go @@ -18,11 +18,15 @@ package beater import ( + "fmt" "net" "path/filepath" "regexp" "time" + "github.com/pkg/errors" + + "github.com/elastic/apm-server/rate" "github.com/elastic/apm-server/sourcemap" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/outputs" @@ -49,6 +53,8 @@ type Config struct { RumConfig *rumConfig `config:"rum"` FrontendConfig *rumConfig `config:"frontend"` Register *registerConfig `config:"register"` + + rateLimitHandler rate.LimitHandler } type ExpvarConfig struct { @@ -59,12 +65,14 @@ type ExpvarConfig struct { type rumConfig struct { Enabled *bool `config:"enabled"` RateLimit int `config:"rate_limit"` + EventRateLimit int `config:"event_rate_limit"` AllowOrigins []string `config:"allow_origins"` LibraryPattern string `config:"library_pattern"` ExcludeFromGrouping string `config:"exclude_from_grouping"` SourceMapping *SourceMapping `config:"source_mapping"` - beatVersion string + rateLimitHandler rate.LimitHandler + beatVersion string } type metricsConfig struct { @@ -107,6 +115,27 @@ type InstrumentationConfig struct { Environment *string `config:"environment"` } +func NewConfig(version string, ucfg *common.Config) (*Config, error) { + c := defaultConfig(version) + if err := ucfg.Unpack(c); err != nil { + return nil, errors.Wrap(err, "Error processing configuration") + } + + c.setRumConfig() + if c.RumConfig.isEnabled() { + if _, err := regexp.Compile(c.RumConfig.LibraryPattern); err != nil { + return nil, errors.New(fmt.Sprintf("Invalid regex for `library_pattern`: %v", err.Error())) + } + if _, err := regexp.Compile(c.RumConfig.ExcludeFromGrouping); err != nil { + return nil, errors.New(fmt.Sprintf("Invalid regex for `exclude_from_grouping`: %v", err.Error())) + } + } + c.RumConfig.rateLimitHandler = rate.NewLimitHandler(c.RumConfig.EventRateLimit) + c.rateLimitHandler = &rate.NilLimitHandler{} + + return c, nil +} + func (c *Config) setSmapElasticsearch(esConfig *common.Config) { if c != nil && c.RumConfig.isEnabled() && c.RumConfig.SourceMapping != nil { c.RumConfig.SourceMapping.EsConfig = esConfig @@ -141,7 +170,7 @@ func (c *pipelineConfig) shouldOverwrite() bool { return c != nil && (c.Overwrite != nil && *c.Overwrite) } -func (c *Config) SetRumConfig() { +func (c *Config) setRumConfig() { if c.RumConfig != nil && c.RumConfig.Enabled != nil { return } @@ -181,8 +210,9 @@ func replaceVersion(pattern, version string) string { func defaultRum(beatVersion string) *rumConfig { return &rumConfig{ - RateLimit: 10, - AllowOrigins: []string{"*"}, + RateLimit: 10, + EventRateLimit: 5000, + AllowOrigins: []string{"*"}, SourceMapping: &SourceMapping{ Cache: &Cache{ Expiration: 5 * time.Minute, diff --git a/beater/config_test.go b/beater/config_test.go index 3775eb7ad6a..3eb403a0fda 100644 --- a/beater/config_test.go +++ b/beater/config_test.go @@ -54,6 +54,7 @@ func TestConfig(t *testing.T) { "rum": { "enabled": true, "rate_limit": 800, + "event_rate_limit": 8000, "allow_origins": ["rum*"], "source_mapping": { "cache": { @@ -67,6 +68,7 @@ func TestConfig(t *testing.T) { "frontend": { "enabled": true, "rate_limit": 1000, + "event_rate_limit": 1000, "allow_origins": ["example*"], "source_mapping": { "cache": { @@ -97,9 +99,10 @@ func TestConfig(t *testing.T) { SecretToken: "1234random", SSL: &SSLConfig{Enabled: &truthy, Certificate: outputs.CertificateConfig{Certificate: "1234cert", Key: "1234key"}}, RumConfig: &rumConfig{ - Enabled: &truthy, - RateLimit: 800, - AllowOrigins: []string{"rum*"}, + Enabled: &truthy, + RateLimit: 800, + EventRateLimit: 8000, + AllowOrigins: []string{"rum*"}, SourceMapping: &SourceMapping{ Cache: &Cache{Expiration: 1 * time.Minute}, IndexPattern: "apm-rum-test*", @@ -108,9 +111,10 @@ func TestConfig(t *testing.T) { ExcludeFromGrouping: "group_pattern-rum", }, FrontendConfig: &rumConfig{ - Enabled: &truthy, - RateLimit: 1000, - AllowOrigins: []string{"example*"}, + Enabled: &truthy, + RateLimit: 1000, + EventRateLimit: 1000, + AllowOrigins: []string{"example*"}, SourceMapping: &SourceMapping{ Cache: &Cache{Expiration: 10 * time.Minute}, IndexPattern: "apm-test*", @@ -170,9 +174,10 @@ func TestConfig(t *testing.T) { }, }, RumConfig: &rumConfig{ - Enabled: nil, - RateLimit: 0, - AllowOrigins: nil, + Enabled: nil, + RateLimit: 0, + EventRateLimit: 0, + AllowOrigins: nil, SourceMapping: &SourceMapping{ IndexPattern: "", }, @@ -265,7 +270,7 @@ func TestIsRumEnabled(t *testing.T) { {c: &Config{RumConfig: &rumConfig{Enabled: &truthy}}, enabled: true}, {c: &Config{RumConfig: &rumConfig{Enabled: new(bool)}, FrontendConfig: &rumConfig{Enabled: &truthy}}, enabled: false}, } { - td.c.SetRumConfig() + td.c.setRumConfig() assert.Equal(t, td.enabled, td.c.RumConfig.isEnabled()) } @@ -301,7 +306,7 @@ func TestSetRum(t *testing.T) { {c: &Config{RumConfig: testRumConf, FrontendConfig: testFrontendConf}, rc: testRumConf}, } for _, test := range cases { - test.c.SetRumConfig() + test.c.setRumConfig() assert.Equal(t, test.rc, test.c.RumConfig) } } diff --git a/beater/route_config.go b/beater/route_config.go index ffbab716a1a..b0e82e2e7b5 100644 --- a/beater/route_config.go +++ b/beater/route_config.go @@ -21,12 +21,13 @@ import ( "net/http" "regexp" - "github.com/elastic/apm-server/processor/stream" + "github.com/elastic/apm-server/rate" "github.com/elastic/apm-server/processor" perr "github.com/elastic/apm-server/processor/error" "github.com/elastic/apm-server/processor/metric" "github.com/elastic/apm-server/processor/sourcemap" + "github.com/elastic/apm-server/processor/stream" "github.com/elastic/apm-server/processor/transaction" "github.com/elastic/apm-server/publish" @@ -72,16 +73,27 @@ var V1Routes = map[string]v1Route{ var V2Routes = map[string]v2Route{ V2BackendURL: v2BackendRoute, - V2RumURL: {rumRouteType}, + V2RumURL: v2RumRoute, } -var v2BackendRoute = v2Route{ - routeType{ - v2backendHandler, - systemMetadataDecoder, - func(*Config) transform.Config { return transform.Config{} }, - }, -} +var ( + v2BackendRoute = v2Route{ + routeType{ + v2backendHandler, + systemMetadataDecoder, + func(*Config) transform.Config { return transform.Config{} }, + }, + func(c *Config) rate.LimitHandler { return c.rateLimitHandler }, + } + v2RumRoute = v2Route{ + routeType{ + v2rumHandler, + userMetaDataDecoder, + rumTransformConfig, + }, + func(c *Config) rate.LimitHandler { return c.RumConfig.rateLimitHandler }, + } +) var ( backendRouteType = routeType{ @@ -120,6 +132,15 @@ func v2backendHandler(beaterConfig *Config, h http.Handler) http.Handler { authHandler(beaterConfig.SecretToken, h))) } +//TODO simi: this reduces allowed events by 1 +func v2rumHandler(beaterConfig *Config, h http.Handler) http.Handler { + return killSwitchHandler(beaterConfig.RumConfig.isEnabled(), + requestTimeHandler( + concurrencyLimitHandler(beaterConfig, + ipRateLimitHandler(beaterConfig.RumConfig.rateLimitHandler, + corsHandler(beaterConfig.RumConfig.AllowOrigins, h))))) +} + func backendHandler(beaterConfig *Config, h http.Handler) http.Handler { return logHandler( requestTimeHandler( @@ -131,7 +152,7 @@ func rumHandler(beaterConfig *Config, h http.Handler) http.Handler { return killSwitchHandler(beaterConfig.RumConfig.isEnabled(), requestTimeHandler( concurrencyLimitHandler(beaterConfig, - ipRateLimitHandler(beaterConfig.RumConfig.RateLimit, + ipRateLimitHandler(rate.NewLimitHandler(beaterConfig.RumConfig.RateLimit), corsHandler(beaterConfig.RumConfig.AllowOrigins, h))))) } @@ -189,9 +210,10 @@ func (v *v1Route) Handler(p processor.Processor, beaterConfig *Config, report pu type v2Route struct { routeType + rateLimitHandler func(c *Config) rate.LimitHandler } -func (v v2Route) Handler(beaterConfig *Config, report publish.Reporter) http.Handler { +func (v v2Route) Handler(h rate.LimitHandler, beaterConfig *Config, report publish.Reporter) http.Handler { reqDecoder := v.configurableDecoder( beaterConfig, func(*http.Request) (map[string]interface{}, error) { return map[string]interface{}{}, nil }, @@ -199,7 +221,7 @@ func (v v2Route) Handler(beaterConfig *Config, report publish.Reporter) http.Han v2Handler := v2Handler{ requestDecoder: reqDecoder, - streamProcessor: &stream.StreamProcessor{Tconfig: v.transformConfig(beaterConfig)}, + streamProcessor: stream.NewStreamProcessor(v.transformConfig(beaterConfig), h), } return v.wrappingHandler(beaterConfig, v2Handler.Handle(beaterConfig, report)) diff --git a/beater/v2_handler.go b/beater/v2_handler.go index 5e396dd5560..0623c7bb295 100644 --- a/beater/v2_handler.go +++ b/beater/v2_handler.go @@ -18,11 +18,13 @@ package beater import ( + "context" "encoding/json" "net/http" "github.com/elastic/apm-server/processor/stream" "github.com/elastic/apm-server/publish" + "github.com/elastic/apm-server/utility" "github.com/elastic/beats/libbeat/logp" @@ -53,6 +55,8 @@ func (v *v2Handler) statusCode(sr *stream.Result) int { code = http.StatusServiceUnavailable case stream.ServerErrType: code = http.StatusInternalServerError + case stream.RateLimitErrType: + code = http.StatusTooManyRequests default: code = http.StatusInternalServerError } @@ -109,7 +113,8 @@ func (v *v2Handler) Handle(beaterConfig *Config, report publish.Reporter) http.H return } - res := v.streamProcessor.HandleStream(r.Context(), reqMeta, ndReader, report) + ctx := context.WithValue(r.Context(), stream.RateLimitKey, utility.RemoteAddr(r)) + res := v.streamProcessor.HandleStream(ctx, reqMeta, ndReader, report) v.sendResponse(logger, w, res) }) diff --git a/beater/v2_handler_test.go b/beater/v2_handler_test.go index 1854b84aae3..b4d9e8f1380 100644 --- a/beater/v2_handler_test.go +++ b/beater/v2_handler_test.go @@ -25,6 +25,7 @@ import ( "testing" "github.com/elastic/apm-server/publish" + "github.com/elastic/apm-server/rate" "github.com/elastic/apm-server/tests" "github.com/pkg/errors" @@ -41,7 +42,7 @@ func TestInvalidContentType(t *testing.T) { w := httptest.NewRecorder() c := defaultConfig("7.0.0") - handler := (&v2BackendRoute).Handler(c, nil) + handler := (&v2BackendRoute).Handler(nil, c, nil) handler.ServeHTTP(w, req) @@ -55,7 +56,7 @@ func TestEmptyRequest(t *testing.T) { w := httptest.NewRecorder() c := defaultConfig("7.0.0") - handler := (&v2BackendRoute).Handler(c, nil) + handler := (&v2BackendRoute).Handler(nil, c, nil) handler.ServeHTTP(w, req) @@ -79,9 +80,10 @@ func TestRequestDecoderError(t *testing.T) { func(*Config, decoder.ReqDecoder) decoder.ReqDecoder { return faultyDecoder }, func(*Config) transform.Config { return transform.Config{} }, }, + func(*Config) rate.LimitHandler { return &rate.NilLimitHandler{} }, } - handler := testRouteWithFaultyDecoder.Handler(c, nil) + handler := testRouteWithFaultyDecoder.Handler(nil, c, nil) handler.ServeHTTP(w, req) @@ -120,7 +122,7 @@ func TestRequestIntegration(t *testing.T) { report := func(context.Context, publish.PendingReq) error { return test.reportingErr } - handler := (&v2BackendRoute).Handler(c, report) + handler := (&v2BackendRoute).Handler(nil, c, report) handler.ServeHTTP(w, req) diff --git a/processor/stream/result.go b/processor/stream/result.go index 1e28b728587..fe2665e567b 100644 --- a/processor/stream/result.go +++ b/processor/stream/result.go @@ -42,6 +42,7 @@ const ( InvalidInputErrType ShuttingDownErrType ServerErrType + RateLimitErrType ) const ( diff --git a/processor/stream/stream_processor.go b/processor/stream/stream_processor.go index ef5a2cdd5c3..1cbd739708f 100644 --- a/processor/stream/stream_processor.go +++ b/processor/stream/stream_processor.go @@ -21,6 +21,7 @@ import ( "context" "errors" "io" + "time" "github.com/santhosh-tekuri/jsonschema" @@ -31,11 +32,16 @@ import ( "github.com/elastic/apm-server/model/span" "github.com/elastic/apm-server/model/transaction" "github.com/elastic/apm-server/publish" + "github.com/elastic/apm-server/rate" "github.com/elastic/apm-server/transform" "github.com/elastic/apm-server/utility" "github.com/elastic/apm-server/validation" ) +const ( + RateLimitKey = "rateLimitKey" +) + var ( ErrUnrecognizedObject = errors.New("did not recognize object type") ) @@ -47,7 +53,15 @@ type StreamReader interface { } type StreamProcessor struct { - Tconfig transform.Config + tconfig transform.Config + rateLimitHandler rate.LimitHandler +} + +func NewStreamProcessor(c transform.Config, rh rate.LimitHandler) *StreamProcessor { + if rh == nil { + rh = &rate.NilLimitHandler{} + } + return &StreamProcessor{tconfig: c, rateLimitHandler: rh} } const batchSize = 10 @@ -146,12 +160,39 @@ func (v *StreamProcessor) handleRawModel(rawModel map[string]interface{}) (trans // readBatch will read up to `batchSize` objects from the ndjson stream // it returns a slice of eventables and a bool that indicates if there might be more to read. -func (v *StreamProcessor) readBatch(batchSize int, reader StreamReader, response *Result) ([]transform.Transformable, bool) { - var err error - var rawModel map[string]interface{} +func (v *StreamProcessor) readBatch(ctx context.Context, batchSize int, reader StreamReader, response *Result) ([]transform.Transformable, bool) { + var ( + err error + rawModel map[string]interface{} + ctxT context.Context + cancel context.CancelFunc + ) + + key, ok := ctx.Value(RateLimitKey).(string) + if !ok { + key = "" + } var eventables []transform.Transformable for i := 0; i < batchSize && err == nil; i++ { + + //Throttle read in case rate limit is hit, by using + //blocking wait until rate limiter allows to read next event. + //Consider burst and timeout indicating how long to wait for rate limiter. + //Return early if rate limiter returns error. + if key != "" { + ctxT, cancel = context.WithTimeout(ctx, time.Second) + defer cancel() + err = v.rateLimitHandler.Wait(ctxT, key) + if err != nil { + response.Add(&Error{ + Type: RateLimitErrType, + Message: "rate limit exceeded", + }) + return eventables, true + } + + } rawModel, err = reader.Read() if err != nil && err != io.EOF { @@ -197,12 +238,12 @@ func (s *StreamProcessor) HandleStream(ctx context.Context, meta map[string]inte tctx := &transform.Context{ RequestTime: utility.RequestTime(ctx), - Config: s.Tconfig, + Config: s.tconfig, Metadata: *metadata, } for { - transformables, done := s.readBatch(batchSize, jsonReader, res) + transformables, done := s.readBatch(ctx, batchSize, jsonReader, res) if transformables != nil { err := report(ctx, publish.PendingReq{ Transformables: transformables, diff --git a/rate/limit.go b/rate/limit.go new file mode 100644 index 00000000000..d55f28ea10e --- /dev/null +++ b/rate/limit.go @@ -0,0 +1,72 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package rate + +import ( + "context" + + lru "github.com/hashicorp/golang-lru" + r "golang.org/x/time/rate" +) + +const ( + cacheSize = 1000 + burstMultiplier = 2 +) + +type LimitHandler interface { + Allow(key string) bool + Wait(ctx context.Context, key string) error +} + +func NewLimitHandler(limit int) LimitHandler { + if limit == -1 { + return &NilLimitHandler{} + } + c, _ := lru.New(cacheSize) + return &CacheLimitHandler{cache: c, limit: limit} +} + +// NilHandler +type NilLimitHandler struct{} + +func (h *NilLimitHandler) Allow(key string) bool { return true } +func (h *NilLimitHandler) Wait(ctx context.Context, key string) error { return nil } + +// LimitHandler using a LRU cache for storing keys +type CacheLimitHandler struct { + cache *lru.Cache + limit int +} + +func (h *CacheLimitHandler) Allow(key string) bool { + return h.getLimiter(key).Allow() +} + +func (h *CacheLimitHandler) Wait(ctx context.Context, key string) error { + return h.getLimiter(key).Wait(ctx) +} + +func (h *CacheLimitHandler) getLimiter(key string) *r.Limiter { + if !h.cache.Contains(key) { + h.cache.Add(key, r.NewLimiter(r.Limit(h.limit), h.limit*burstMultiplier)) + } + entry, _ := h.cache.Get(key) + limiter, _ := entry.(*r.Limiter) + return limiter +} diff --git a/rate/limit_test.go b/rate/limit_test.go new file mode 100644 index 00000000000..410b0fedca3 --- /dev/null +++ b/rate/limit_test.go @@ -0,0 +1,57 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package rate + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestNilLimitHandler(t *testing.T) { + h := NewLimitHandler(-1) + // show that it is always true + for i := 0; i < 5; i++ { + assert.True(t, h.Allow("")) + assert.NoError(t, h.Wait(nil, "")) + } +} + +func TestCachedLimitHandler(t *testing.T) { + h := NewLimitHandler(1) + key := "foo" + ctx := context.Background() + + // allows for limit keys + assert.True(t, h.Allow(key)) + assert.True(t, h.Allow(key)) //considering burst + assert.False(t, h.Allow(key)) + + //does wait if deadline is not hit + assert.NoError(t, h.Wait(ctx, key)) + ctx, cancel := context.WithTimeout(ctx, 1*time.Second) + defer cancel() + assert.NoError(t, h.Wait(ctx, key)) + + assert.False(t, h.Allow(key)) + ctx, cancel = context.WithTimeout(ctx, 0*time.Millisecond) + defer cancel() + assert.Error(t, h.Wait(ctx, key)) +}