Skip to content

Commit

Permalink
[v2] Add rate limit per event.
Browse files Browse the repository at this point in the history
* Enabled for RUM endpoints with default value 5000.
* Deny request if rate limit is hit when http request is established.
* Throttle read if http request is successfully established, but rate
  limit is hit while reading the request.
* Keep rate limiting for v1 unchanged.

part of elastic#1299
  • Loading branch information
simitt committed Sep 7, 2018
1 parent 77989a5 commit f79fbce
Show file tree
Hide file tree
Showing 14 changed files with 360 additions and 126 deletions.
5 changes: 5 additions & 0 deletions _meta/beat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ apm-server:
# Rate limit per second and IP address for requests sent to the RUM endpoint.
#rate_limit: 10

# Settings concerning only requests sent to `/v2/rum/intake` endpoint:
# Rate limit per second and IP address for amount of events sent to the RUM endpoint.
# Defaults to 5000.
#event_rate_limit: 5000

# Comma separated list of permitted origins for real user monitoring.
# User-agents will send an origin header that will be validated against this list.
# An origin is made of a protocol scheme, host and port, without the url path.
Expand Down
5 changes: 5 additions & 0 deletions apm-server.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ apm-server:
# Rate limit per second and IP address for requests sent to the RUM endpoint.
#rate_limit: 10

# Settings concerning only requests sent to `/v2/rum/intake` endpoint:
# Rate limit per second and IP address for amount of events sent to the RUM endpoint.
# Defaults to 5000.
#event_rate_limit: 5000

# Comma separated list of permitted origins for real user monitoring.
# User-agents will send an origin header that will be validated against this list.
# An origin is made of a protocol scheme, host and port, without the url path.
Expand Down
16 changes: 3 additions & 13 deletions beater/beater.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,13 @@
package beater

import (
"fmt"
"net"
"net/http"
"net/url"
"os"
"regexp"
"sync"
"time"

"github.com/pkg/errors"
"golang.org/x/sync/errgroup"

"github.com/elastic/apm-agent-go"
Expand All @@ -52,18 +49,11 @@ type beater struct {
// Creates beater
func New(b *beat.Beat, ucfg *common.Config) (beat.Beater, error) {
logger := logp.NewLogger("beater")
beaterConfig := defaultConfig(b.Info.Version)
if err := ucfg.Unpack(beaterConfig); err != nil {
return nil, errors.Wrap(err, "Error processing configuration")
beaterConfig, err := NewConfig(b.Info.Version, ucfg)
if err != nil {
return nil, err
}
beaterConfig.SetRumConfig()
if beaterConfig.RumConfig.isEnabled() {
if _, err := regexp.Compile(beaterConfig.RumConfig.LibraryPattern); err != nil {
return nil, errors.New(fmt.Sprintf("Invalid regex for `library_pattern`: %v", err.Error()))
}
if _, err := regexp.Compile(beaterConfig.RumConfig.ExcludeFromGrouping); err != nil {
return nil, errors.New(fmt.Sprintf("Invalid regex for `exclude_from_grouping`: %v", err.Error()))
}
if b.Config != nil && beaterConfig.RumConfig.SourceMapping.EsConfig == nil {
// fall back to elasticsearch output configuration for sourcemap storage if possible
if isElasticsearchOutput(b) {
Expand Down
48 changes: 32 additions & 16 deletions beater/beater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (

"github.com/elastic/apm-agent-go"
"github.com/elastic/apm-server/publish"
"github.com/elastic/apm-server/rate"
"github.com/elastic/apm-server/tests/loader"
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
Expand All @@ -50,6 +51,10 @@ import (

func TestBeatConfig(t *testing.T) {
falsy, truthy := false, true
dc := defaultConfig("6.2.0")
dc.rateLimitHandler = &rate.NilLimitHandler{}
dc.RumConfig.rateLimitHandler = rate.NewLimitHandler(dc.RumConfig.EventRateLimit)
dc.FrontendConfig.rateLimitHandler = rate.NewLimitHandler(dc.RumConfig.EventRateLimit)

tests := []struct {
conf map[string]interface{}
Expand All @@ -59,7 +64,7 @@ func TestBeatConfig(t *testing.T) {
}{
{
conf: map[string]interface{}{},
beaterConf: defaultConfig("6.2.0"),
beaterConf: dc,
msg: "Default config created for empty config.",
},
{
Expand All @@ -84,9 +89,10 @@ func TestBeatConfig(t *testing.T) {
"url": "/debug/vars",
},
"frontend": map[string]interface{}{
"enabled": true,
"rate_limit": 1000,
"allow_origins": []string{"example*"},
"enabled": true,
"rate_limit": 1000,
"event_rate_limit": 7200,
"allow_origins": []string{"example*"},
"source_mapping": map[string]interface{}{
"cache": map[string]interface{}{
"expiration": 8 * time.Minute,
Expand Down Expand Up @@ -125,28 +131,32 @@ func TestBeatConfig(t *testing.T) {
Url: "/debug/vars",
},
FrontendConfig: &rumConfig{
Enabled: &truthy,
RateLimit: 1000,
AllowOrigins: []string{"example*"},
Enabled: &truthy,
RateLimit: 1000,
EventRateLimit: 7200,
AllowOrigins: []string{"example*"},
SourceMapping: &SourceMapping{
Cache: &Cache{Expiration: 8 * time.Minute},
IndexPattern: "apm-test*",
},
LibraryPattern: "^custom",
ExcludeFromGrouping: "^grouping",
beatVersion: "6.2.0",
rateLimitHandler: rate.NewLimitHandler(7200),
},
RumConfig: &rumConfig{
Enabled: &truthy,
RateLimit: 1000,
AllowOrigins: []string{"example*"},
Enabled: &truthy,
RateLimit: 1000,
EventRateLimit: 7200,
AllowOrigins: []string{"example*"},
SourceMapping: &SourceMapping{
Cache: &Cache{Expiration: 8 * time.Minute},
IndexPattern: "apm-test*",
},
LibraryPattern: "^custom",
ExcludeFromGrouping: "^grouping",
beatVersion: "6.2.0",
rateLimitHandler: rate.NewLimitHandler(7200),
},
Metrics: &metricsConfig{
Enabled: &falsy,
Expand All @@ -161,6 +171,7 @@ func TestBeatConfig(t *testing.T) {
},
},
},
rateLimitHandler: &rate.NilLimitHandler{},
},
msg: "Given config overwrites default",
},
Expand All @@ -186,7 +197,8 @@ func TestBeatConfig(t *testing.T) {
},
},
"rum": map[string]interface{}{
"enabled": true,
"enabled": true,
"event_rate_limit": 7200,
"source_mapping": map[string]interface{}{
"cache": map[string]interface{}{
"expiration": 7,
Expand Down Expand Up @@ -218,8 +230,9 @@ func TestBeatConfig(t *testing.T) {
Url: "/debug/vars",
},
FrontendConfig: &rumConfig{
Enabled: &truthy,
RateLimit: 890,
Enabled: &truthy,
RateLimit: 890,
EventRateLimit: 5000,
SourceMapping: &SourceMapping{
Cache: &Cache{
Expiration: 4 * time.Second,
Expand All @@ -232,9 +245,10 @@ func TestBeatConfig(t *testing.T) {
beatVersion: "6.2.0",
},
RumConfig: &rumConfig{
Enabled: &truthy,
RateLimit: 10,
AllowOrigins: []string{"*"},
Enabled: &truthy,
RateLimit: 10,
EventRateLimit: 7200,
AllowOrigins: []string{"*"},
SourceMapping: &SourceMapping{
Cache: &Cache{
Expiration: 7 * time.Second,
Expand All @@ -244,6 +258,7 @@ func TestBeatConfig(t *testing.T) {
LibraryPattern: "rum",
ExcludeFromGrouping: "^/webpack",
beatVersion: "6.2.0",
rateLimitHandler: rate.NewLimitHandler(7200),
},
Metrics: &metricsConfig{
Enabled: &truthy,
Expand All @@ -258,6 +273,7 @@ func TestBeatConfig(t *testing.T) {
},
},
},
rateLimitHandler: &rate.NilLimitHandler{},
},
msg: "Given config merged with default",
},
Expand Down
22 changes: 4 additions & 18 deletions beater/common_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,14 @@ import (
"strings"
"time"

"github.com/hashicorp/golang-lru"
"github.com/pkg/errors"
"github.com/ryanuber/go-glob"
"github.com/satori/go.uuid"
"golang.org/x/time/rate"

"github.com/elastic/apm-server/decoder"
"github.com/elastic/apm-server/processor"
"github.com/elastic/apm-server/publish"
"github.com/elastic/apm-server/rate"
"github.com/elastic/apm-server/transform"

"github.com/elastic/apm-server/utility"
Expand All @@ -46,9 +45,6 @@ import (
)

const (
rateLimitCacheSize = 1000
rateLimitBurstMultiplier = 2

supportedHeaders = "Content-Type, Content-Encoding, Accept"
supportedMethods = "POST, OPTIONS"
)
Expand Down Expand Up @@ -161,7 +157,7 @@ func newMuxer(beaterConfig *Config, report publish.Reporter) *http.ServeMux {
for path, route := range V2Routes {
logger.Infof("Path %s added to request handler", path)

mux.Handle(path, route.Handler(beaterConfig, report))
mux.Handle(path, route.Handler(route.rateHandler(beaterConfig), beaterConfig, report))
}

mux.Handle(rootURL, rootHandler(beaterConfig.SecretToken))
Expand Down Expand Up @@ -294,19 +290,9 @@ func killSwitchHandler(killSwitch bool, h http.Handler) http.Handler {
})
}

func ipRateLimitHandler(rateLimit int, h http.Handler) http.Handler {
cache, _ := lru.New(rateLimitCacheSize)

var deny = func(ip string) bool {
if !cache.Contains(ip) {
cache.Add(ip, rate.NewLimiter(rate.Limit(rateLimit), rateLimit*rateLimitBurstMultiplier))
}
var limiter, _ = cache.Get(ip)
return !limiter.(*rate.Limiter).Allow()
}

func ipRateLimitHandler(rh rate.LimitHandler, h http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if deny(utility.RemoteAddr(r)) {
if !rh.Allow(utility.RemoteAddr(r)) {
sendStatus(w, r, rateLimitedResponse)
return
}
Expand Down
40 changes: 36 additions & 4 deletions beater/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,15 @@
package beater

import (
"fmt"
"net"
"path/filepath"
"regexp"
"time"

"github.com/pkg/errors"

"github.com/elastic/apm-server/rate"
"github.com/elastic/apm-server/sourcemap"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/outputs"
Expand All @@ -49,6 +53,8 @@ type Config struct {
RumConfig *rumConfig `config:"rum"`
FrontendConfig *rumConfig `config:"frontend"`
Register *registerConfig `config:"register"`

rateLimitHandler rate.LimitHandler
}

type ExpvarConfig struct {
Expand All @@ -59,12 +65,14 @@ type ExpvarConfig struct {
type rumConfig struct {
Enabled *bool `config:"enabled"`
RateLimit int `config:"rate_limit"`
EventRateLimit int `config:"event_rate_limit"`
AllowOrigins []string `config:"allow_origins"`
LibraryPattern string `config:"library_pattern"`
ExcludeFromGrouping string `config:"exclude_from_grouping"`
SourceMapping *SourceMapping `config:"source_mapping"`

beatVersion string
rateLimitHandler rate.LimitHandler
beatVersion string
}

type metricsConfig struct {
Expand Down Expand Up @@ -107,6 +115,29 @@ type InstrumentationConfig struct {
Environment *string `config:"environment"`
}

func NewConfig(version string, ucfg *common.Config) (*Config, error) {
c := defaultConfig(version)
if err := ucfg.Unpack(c); err != nil {
return nil, errors.Wrap(err, "Error processing configuration")
}

if c.RumConfig == nil || c.RumConfig.Enabled == nil {
c.RumConfig = c.FrontendConfig
}
if c.RumConfig.isEnabled() {
if _, err := regexp.Compile(c.RumConfig.LibraryPattern); err != nil {
return nil, errors.New(fmt.Sprintf("Invalid regex for `library_pattern`: %v", err.Error()))
}
if _, err := regexp.Compile(c.RumConfig.ExcludeFromGrouping); err != nil {
return nil, errors.New(fmt.Sprintf("Invalid regex for `exclude_from_grouping`: %v", err.Error()))
}
}
c.RumConfig.rateLimitHandler = rate.NewLimitHandler(c.RumConfig.EventRateLimit)
c.rateLimitHandler = &rate.NilLimitHandler{}

return c, nil
}

func (c *Config) setSmapElasticsearch(esConfig *common.Config) {
if c != nil && c.RumConfig.isEnabled() && c.RumConfig.SourceMapping != nil {
c.RumConfig.SourceMapping.EsConfig = esConfig
Expand Down Expand Up @@ -141,7 +172,7 @@ func (c *pipelineConfig) shouldOverwrite() bool {
return c != nil && (c.Overwrite != nil && *c.Overwrite)
}

func (c *Config) SetRumConfig() {
func (c *Config) setRumConfig() {
if c.RumConfig != nil && c.RumConfig.Enabled != nil {
return
}
Expand Down Expand Up @@ -181,8 +212,9 @@ func replaceVersion(pattern, version string) string {

func defaultRum(beatVersion string) *rumConfig {
return &rumConfig{
RateLimit: 10,
AllowOrigins: []string{"*"},
RateLimit: 10,
EventRateLimit: 5000,
AllowOrigins: []string{"*"},
SourceMapping: &SourceMapping{
Cache: &Cache{
Expiration: 5 * time.Minute,
Expand Down
Loading

0 comments on commit f79fbce

Please sign in to comment.