Skip to content

Commit

Permalink
[v2] add RUM event rate limit per ip per sec(elastic#1367)
Browse files Browse the repository at this point in the history
* Enabled for RUM endpoints with default value 300, burst multiplier 3
* Throttle read if rate limit is hit while reading the request, deny request if limit is already hit when the request is started.
* Check rate limit in batches of 10.
* Use eviction handling for LRU cache holding the rate limiters. 
* Keep rate limiting for v1 unchanged.

partly implements elastic#1299
  • Loading branch information
simitt authored and Ron cohen committed Oct 16, 2018
1 parent bfb4673 commit 016a77e
Show file tree
Hide file tree
Showing 36 changed files with 651 additions and 107 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ https://github.com/elastic/apm-server/compare/6.4\...master[View commits]
- Require either `message` or `type` for `error.exception` {pull}1354[1354].
- Require `span.parent_id`, forbid `null` for `span.trace_id`, `transaction.trace_id` {pull}1391[1391]
- Require `error.id` and changed description to 128 random bits ID {pull}1384[1384]
- Add rate limit handling per event {pull}1367[1367]

[[release-notes-6.4]]
== APM Server version 6.4
Expand Down
18 changes: 18 additions & 0 deletions _meta/beat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,26 @@ apm-server:
# To enable real user monitoring (RUM) support set this to true.
#enabled: false

#-- v1 RUM endpoint

# Rate limit per second and IP address for requests sent to the RUM endpoint.
#rate_limit: 10

#-- v2 RUM endpoint

#event_rate:

# Defines the maximum amount of events allowed to be sent to the APM Server v2 RUM
# endpoint per ip per second. Defaults to 300.
#limit: 300

# An LRU cache is used to keep a rate limit per IP for the most recently seen IPs.
# This setting defines the number of unique IPs that can be tracked in the cache.
# Sites with many concurrent clients should consider increasing this limit. Defaults to 1000.
#lru_size: 1000

#-- General RUM settings

# Comma separated list of permitted origins for real user monitoring.
# User-agents will send an origin header that will be validated against this list.
# An origin is made of a protocol scheme, host and port, without the url path.
Expand Down Expand Up @@ -90,6 +107,7 @@ apm-server:
# is changed, a matching index pattern needs to be specified here.
#index_pattern: "apm-*-sourcemap*"


# If set to true, APM Server augments data received by the agent with the original IP of the backend server,
# or the IP and User Agent of the real user (RUM requests). It defaults to true.
#capture_personal_data: true
Expand Down
18 changes: 18 additions & 0 deletions apm-server.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,26 @@ apm-server:
# To enable real user monitoring (RUM) support set this to true.
#enabled: false

#-- v1 RUM endpoint

# Rate limit per second and IP address for requests sent to the RUM endpoint.
#rate_limit: 10

#-- v2 RUM endpoint

#event_rate:

# Defines the maximum amount of events allowed to be sent to the APM Server v2 RUM
# endpoint per ip per second. Defaults to 300.
#limit: 300

# An LRU cache is used to keep a rate limit per IP for the most recently seen IPs.
# This setting defines the number of unique IPs that can be tracked in the cache.
# Sites with many concurrent clients should consider increasing this limit. Defaults to 1000.
#lru_size: 1000

#-- General RUM settings

# Comma separated list of permitted origins for real user monitoring.
# User-agents will send an origin header that will be validated against this list.
# An origin is made of a protocol scheme, host and port, without the url path.
Expand Down Expand Up @@ -90,6 +107,7 @@ apm-server:
# is changed, a matching index pattern needs to be specified here.
#index_pattern: "apm-*-sourcemap*"


# If set to true, APM Server augments data received by the agent with the original IP of the backend server,
# or the IP and User Agent of the real user (RUM requests). It defaults to true.
#capture_personal_data: true
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"accepted": 30,
"errors": [
{
"message": "rate limit exceeded"
}
]
}
16 changes: 3 additions & 13 deletions beater/beater.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,13 @@
package beater

import (
"fmt"
"net"
"net/http"
"net/url"
"os"
"regexp"
"sync"
"time"

"github.com/pkg/errors"
"golang.org/x/sync/errgroup"

"github.com/elastic/apm-agent-go"
Expand All @@ -52,18 +49,11 @@ type beater struct {
// Creates beater
func New(b *beat.Beat, ucfg *common.Config) (beat.Beater, error) {
logger := logp.NewLogger("beater")
beaterConfig := defaultConfig(b.Info.Version)
if err := ucfg.Unpack(beaterConfig); err != nil {
return nil, errors.Wrap(err, "Error processing configuration")
beaterConfig, err := NewConfig(b.Info.Version, ucfg)
if err != nil {
return nil, err
}
beaterConfig.SetRumConfig()
if beaterConfig.RumConfig.isEnabled() {
if _, err := regexp.Compile(beaterConfig.RumConfig.LibraryPattern); err != nil {
return nil, errors.New(fmt.Sprintf("Invalid regex for `library_pattern`: %v", err.Error()))
}
if _, err := regexp.Compile(beaterConfig.RumConfig.ExcludeFromGrouping); err != nil {
return nil, errors.New(fmt.Sprintf("Invalid regex for `exclude_from_grouping`: %v", err.Error()))
}
if b.Config != nil && beaterConfig.RumConfig.SourceMapping.EsConfig == nil {
// fall back to elasticsearch output configuration for sourcemap storage if possible
if isElasticsearchOutput(b) {
Expand Down
39 changes: 31 additions & 8 deletions beater/beater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,12 @@ func TestBeatConfig(t *testing.T) {
"url": "/debug/vars",
},
"frontend": map[string]interface{}{
"enabled": true,
"rate_limit": 1000,
"enabled": true,
"rate_limit": 1000,
"event_rate": map[string]interface{}{
"limit": 7200,
"lru_size": 2000,
},
"allow_origins": []string{"example*"},
"source_mapping": map[string]interface{}{
"cache": map[string]interface{}{
Expand Down Expand Up @@ -128,8 +132,12 @@ func TestBeatConfig(t *testing.T) {
Url: "/debug/vars",
},
FrontendConfig: &rumConfig{
Enabled: &truthy,
RateLimit: 1000,
Enabled: &truthy,
RateLimit: 1000,
EventRate: &eventRate{
Limit: 7200,
LruSize: 2000,
},
AllowOrigins: []string{"example*"},
SourceMapping: &SourceMapping{
Cache: &Cache{Expiration: 8 * time.Minute},
Expand All @@ -140,8 +148,12 @@ func TestBeatConfig(t *testing.T) {
beatVersion: "6.2.0",
},
RumConfig: &rumConfig{
Enabled: &truthy,
RateLimit: 1000,
Enabled: &truthy,
RateLimit: 1000,
EventRate: &eventRate{
Limit: 7200,
LruSize: 2000,
},
AllowOrigins: []string{"example*"},
SourceMapping: &SourceMapping{
Cache: &Cache{Expiration: 8 * time.Minute},
Expand Down Expand Up @@ -182,6 +194,9 @@ func TestBeatConfig(t *testing.T) {
"frontend": map[string]interface{}{
"enabled": true,
"rate_limit": 890,
"event_rate": map[string]interface{}{
"lru_size": 200,
},
"source_mapping": map[string]interface{}{
"cache": map[string]interface{}{
"expiration": 4,
Expand Down Expand Up @@ -224,6 +239,10 @@ func TestBeatConfig(t *testing.T) {
FrontendConfig: &rumConfig{
Enabled: &truthy,
RateLimit: 890,
EventRate: &eventRate{
Limit: 300,
LruSize: 200,
},
SourceMapping: &SourceMapping{
Cache: &Cache{
Expiration: 4 * time.Second,
Expand All @@ -236,8 +255,12 @@ func TestBeatConfig(t *testing.T) {
beatVersion: "6.2.0",
},
RumConfig: &rumConfig{
Enabled: &truthy,
RateLimit: 10,
Enabled: &truthy,
RateLimit: 10,
EventRate: &eventRate{
Limit: 300,
LruSize: 1000,
},
AllowOrigins: []string{"*"},
SourceMapping: &SourceMapping{
Cache: &Cache{
Expand Down
39 changes: 19 additions & 20 deletions beater/common_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,6 @@ import (
"strings"
"time"

"github.com/gofrs/uuid"
"github.com/hashicorp/golang-lru"
"github.com/pkg/errors"
"github.com/ryanuber/go-glob"
"golang.org/x/time/rate"

"github.com/elastic/apm-server/decoder"
"github.com/elastic/apm-server/processor"
"github.com/elastic/apm-server/publish"
Expand All @@ -42,12 +36,14 @@ import (
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/monitoring"
"github.com/elastic/beats/libbeat/version"
"github.com/gofrs/uuid"
lru "github.com/hashicorp/golang-lru"
"github.com/pkg/errors"
"github.com/ryanuber/go-glob"
"golang.org/x/time/rate"
)

const (
rateLimitCacheSize = 1000
rateLimitBurstMultiplier = 2

supportedHeaders = "Content-Type, Content-Encoding, Accept"
supportedMethods = "POST, OPTIONS"
)
Expand Down Expand Up @@ -126,10 +122,11 @@ var (
counter: validateCounter,
}
}
rateLimitCounter = counter("response.errors.ratelimit")
rateLimitedResponse = serverResponse{
err: errors.New("too many requests"),
code: http.StatusTooManyRequests,
counter: counter("response.errors.ratelimit"),
counter: rateLimitCounter,
}
methodNotAllowedCounter = counter("response.errors.method")
methodNotAllowedResponse = serverResponse{
Expand Down Expand Up @@ -172,7 +169,7 @@ func newMuxer(beaterConfig *Config, report publish.Reporter) *http.ServeMux {
for path, route := range V2Routes {
logger.Infof("Path %s added to request handler", path)

mux.Handle(path, route.Handler(beaterConfig, report))
mux.Handle(path, route.Handler(path, beaterConfig, report))
}

mux.Handle(rootURL, rootHandler(beaterConfig.SecretToken))
Expand Down Expand Up @@ -244,9 +241,11 @@ func rootHandler(secretToken string) http.Handler {
return logHandler(handler)
}

type contextKey string
type reqLoggerKey struct{}

const reqLoggerContextKey = contextKey("requestLogger")
func ContextWithReqLogger(ctx context.Context, rl *logp.Logger) context.Context {
return context.WithValue(ctx, reqLoggerKey{}, rl)
}

func logHandler(h http.Handler) http.Handler {
logger := logp.NewLogger("request")
Expand All @@ -266,13 +265,8 @@ func logHandler(h http.Handler) http.Handler {
"remote_address", utility.RemoteAddr(r),
"user-agent", r.Header.Get("User-Agent"))

lr := r.WithContext(
context.WithValue(r.Context(), reqLoggerContextKey, reqLogger),
)

lw := utility.NewRecordingResponseWriter(w)

h.ServeHTTP(lw, lr)
h.ServeHTTP(lw, r.WithContext(ContextWithReqLogger(r.Context(), reqLogger)))

if lw.Code <= 399 {
reqLogger.Infow("handled request", []interface{}{"response_code", lw.Code}...)
Expand All @@ -290,7 +284,7 @@ func requestTimeHandler(h http.Handler) http.Handler {
// requestLogger is a convenience function to retrieve the logger that was
// added to the request context by handler `logHandler``
func requestLogger(r *http.Request) *logp.Logger {
logger, ok := r.Context().Value(reqLoggerContextKey).(*logp.Logger)
logger, ok := r.Context().Value(reqLoggerKey{}).(*logp.Logger)
if !ok {
logger = logp.NewLogger("request")
}
Expand All @@ -307,6 +301,11 @@ func killSwitchHandler(killSwitch bool, h http.Handler) http.Handler {
})
}

const (
rateLimitCacheSize = 1000
rateLimitBurstMultiplier = 2
)

func ipRateLimitHandler(rateLimit int, h http.Handler) http.Handler {
cache, _ := lru.New(rateLimitCacheSize)

Expand Down
33 changes: 31 additions & 2 deletions beater/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/outputs"
"github.com/elastic/go-ucfg"
"github.com/pkg/errors"
)

const defaultPort = "8200"
Expand Down Expand Up @@ -64,6 +65,7 @@ type ExpvarConfig struct {
type rumConfig struct {
Enabled *bool `config:"enabled"`
RateLimit int `config:"rate_limit"`
EventRate *eventRate `config:"event_rate"`
AllowOrigins []string `config:"allow_origins"`
LibraryPattern string `config:"library_pattern"`
ExcludeFromGrouping string `config:"exclude_from_grouping"`
Expand All @@ -72,6 +74,11 @@ type rumConfig struct {
beatVersion string
}

type eventRate struct {
Limit int `config:"limit"`
LruSize int `config:"lru_size"`
}

type metricsConfig struct {
Enabled *bool `config:"enabled"`
}
Expand Down Expand Up @@ -136,6 +143,24 @@ type InstrumentationConfig struct {
SecretToken string `config:"secret_token"`
}

func NewConfig(version string, ucfg *common.Config) (*Config, error) {
c := defaultConfig(version)
if err := ucfg.Unpack(c); err != nil {
return nil, errors.Wrap(err, "Error processing configuration")
}

c.setRumConfig()
if c.RumConfig.isEnabled() {
if _, err := regexp.Compile(c.RumConfig.LibraryPattern); err != nil {
return nil, errors.New(fmt.Sprintf("Invalid regex for `library_pattern`: %v", err.Error()))
}
if _, err := regexp.Compile(c.RumConfig.ExcludeFromGrouping); err != nil {
return nil, errors.New(fmt.Sprintf("Invalid regex for `exclude_from_grouping`: %v", err.Error()))
}
}
return c, nil
}

func (c *Config) setSmapElasticsearch(esConfig *common.Config) {
if c != nil && c.RumConfig.isEnabled() && c.RumConfig.SourceMapping != nil {
c.RumConfig.SourceMapping.EsConfig = esConfig
Expand Down Expand Up @@ -170,7 +195,7 @@ func (c *pipelineConfig) shouldOverwrite() bool {
return c != nil && (c.Overwrite != nil && *c.Overwrite)
}

func (c *Config) SetRumConfig() {
func (c *Config) setRumConfig() {
if c.RumConfig != nil && c.RumConfig.Enabled != nil {
return
}
Expand Down Expand Up @@ -210,7 +235,11 @@ func replaceVersion(pattern, version string) string {

func defaultRum(beatVersion string) *rumConfig {
return &rumConfig{
RateLimit: 10,
RateLimit: 10,
EventRate: &eventRate{
Limit: 300,
LruSize: 1000,
},
AllowOrigins: []string{"*"},
SourceMapping: &SourceMapping{
Cache: &Cache{
Expand Down
Loading

0 comments on commit 016a77e

Please sign in to comment.