From 5c0766a44275174765892562948fdd261637e1ea Mon Sep 17 00:00:00 2001 From: Andrew Kroh Date: Wed, 14 Jun 2017 18:03:38 -0400 Subject: [PATCH] Add random startup delay to each metricset (#4503) Add random startup delay to each metricset to avoid the thundering herd problem. Fixes #4010. --- CHANGELOG.asciidoc | 1 + metricbeat/_meta/common.full.yml | 4 +++ metricbeat/beater/config.go | 11 ++++++- metricbeat/beater/metricbeat.go | 11 +++---- .../configuration/metricbeat-options.asciidoc | 17 ++++++++++- metricbeat/mb/module/example_test.go | 4 +-- metricbeat/mb/module/factory.go | 12 +++++--- metricbeat/mb/module/runner_test.go | 2 +- metricbeat/mb/module/wrapper.go | 30 ++++++++++++++----- metricbeat/mb/module/wrapper_test.go | 6 ++-- metricbeat/metricbeat.full.yml | 4 +++ .../tests/system/config/metricbeat.yml.j2 | 3 ++ 12 files changed, 78 insertions(+), 27 deletions(-) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index ca8249018c9..1baa4e5b059 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -59,6 +59,7 @@ https://github.com/elastic/beats/compare/v6.0.0-alpha1...master[Check the HEAD d *Heartbeat* *Metricbeat* +- Add random startup delay to each metricset to avoid the thundering herd problem. {issue}4010[4010] *Packetbeat* diff --git a/metricbeat/_meta/common.full.yml b/metricbeat/_meta/common.full.yml index 08d72bcc4ca..11058bb32ed 100644 --- a/metricbeat/_meta/common.full.yml +++ b/metricbeat/_meta/common.full.yml @@ -21,3 +21,7 @@ metricbeat.config.modules: # Set to true to enable config reloading reload.enabled: false + +# Maximum amount of time to randomly delay the start of a metricset. Use 0 to +# disable startup delay. +metricbeat.max_start_delay: 10s diff --git a/metricbeat/beater/config.go b/metricbeat/beater/config.go index 7a2acbb9458..6e7e06ac632 100644 --- a/metricbeat/beater/config.go +++ b/metricbeat/beater/config.go @@ -1,10 +1,19 @@ package beater -import "github.com/elastic/beats/libbeat/common" +import ( + "time" + + "github.com/elastic/beats/libbeat/common" +) // Config is the root of the Metricbeat configuration hierarchy. type Config struct { // Modules is a list of module specific configuration data. Modules []*common.Config `config:"modules"` ReloadModules *common.Config `config:"config.modules"` + MaxStartDelay time.Duration `config:"max_start_delay"` // Upper bound on the random startup delay for metricsets (use 0 to disable startup delay). +} + +var defaultConfig = Config{ + MaxStartDelay: 10 * time.Second, } diff --git a/metricbeat/beater/metricbeat.go b/metricbeat/beater/metricbeat.go index 33356c4f86e..e1dd812de6c 100644 --- a/metricbeat/beater/metricbeat.go +++ b/metricbeat/beater/metricbeat.go @@ -30,14 +30,12 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) { // List all registered modules and metricsets. logp.Info("%s", mb.Registry.String()) - config := Config{} - - err := rawConfig.Unpack(&config) - if err != nil { + config := defaultConfig + if err := rawConfig.Unpack(&config); err != nil { return nil, errors.Wrap(err, "error reading configuration file") } - modules, err := module.NewWrappers(config.Modules, mb.Registry) + modules, err := module.NewWrappers(config.MaxStartDelay, config.Modules, mb.Registry) if err != nil { // Empty config is fine if dynamic config is enabled if !config.ReloadModules.Enabled() { @@ -61,7 +59,6 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) { // that a single unresponsive host cannot inadvertently block other hosts // within the same Module and MetricSet from collection. func (bt *Metricbeat) Run(b *beat.Beat) error { - var wg sync.WaitGroup for _, m := range bt.modules { @@ -78,7 +75,7 @@ func (bt *Metricbeat) Run(b *beat.Beat) error { if bt.config.ReloadModules.Enabled() { logp.Beta("feature dynamic configuration reloading is enabled.") moduleReloader := cfgfile.NewReloader(bt.config.ReloadModules) - factory := module.NewFactory(b.Publisher) + factory := module.NewFactory(bt.config.MaxStartDelay, b.Publisher) go moduleReloader.Run(factory) wg.Add(1) diff --git a/metricbeat/docs/reference/configuration/metricbeat-options.asciidoc b/metricbeat/docs/reference/configuration/metricbeat-options.asciidoc index 2351c3bc981..39ba49300c1 100644 --- a/metricbeat/docs/reference/configuration/metricbeat-options.asciidoc +++ b/metricbeat/docs/reference/configuration/metricbeat-options.asciidoc @@ -22,7 +22,22 @@ metricbeat.modules: hosts: ["root@tcp(127.0.0.1:3306)/"] ------------------------------------------------------------------------------ -==== Metricbeat Options +==== General Options + +===== max_start_delay + +The maximum random delay to apply to the startup of a metricset. Random delays +ranging from [0, _max_start_delay_) are applied to reduce the thundering herd +effect that can occur if a fleet of machines running Metricbeat are restarted at +the same time. Specifying a value of 0 disables the startup delay. The default +is 10s. + +[source,yaml] +---- +metricbeat.max_start_delay: 10s +---- + +==== Module Options You can specify the following options in the `metricbeat` section of the +{beatname_lc}.yml+ config file. These options are the same for all modules. Each module may have additional configuration options that are specific to that module. diff --git a/metricbeat/mb/module/example_test.go b/metricbeat/mb/module/example_test.go index 89798741169..e4b59f80b0f 100644 --- a/metricbeat/mb/module/example_test.go +++ b/metricbeat/mb/module/example_test.go @@ -28,7 +28,7 @@ func ExampleWrapper() { } // Create a new Wrapper based on the configuration. - m, err := module.NewWrapper(config, mb.Registry) + m, err := module.NewWrapper(0, config, mb.Registry) if err != nil { fmt.Println("Error:", err) return @@ -97,7 +97,7 @@ func ExampleRunner() { } // Create a new Wrapper based on the configuration. - m, err := module.NewWrapper(config, mb.Registry) + m, err := module.NewWrapper(0, config, mb.Registry) if err != nil { return } diff --git a/metricbeat/mb/module/factory.go b/metricbeat/mb/module/factory.go index 4a89483e01b..9d7d774576a 100644 --- a/metricbeat/mb/module/factory.go +++ b/metricbeat/mb/module/factory.go @@ -1,6 +1,8 @@ package module import ( + "time" + "github.com/elastic/beats/libbeat/cfgfile" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/publisher" @@ -9,18 +11,20 @@ import ( // Factory is used to register and reload modules type Factory struct { - client func() publisher.Client + client func() publisher.Client + maxStartDelay time.Duration } // NewFactory creates new Reloader instance for the given config -func NewFactory(p publisher.Publisher) *Factory { +func NewFactory(maxStartDelay time.Duration, p publisher.Publisher) *Factory { return &Factory{ - client: p.Connect, + client: p.Connect, + maxStartDelay: maxStartDelay, } } func (r *Factory) Create(c *common.Config) (cfgfile.Runner, error) { - w, err := NewWrapper(c, mb.Registry) + w, err := NewWrapper(r.maxStartDelay, c, mb.Registry) if err != nil { return nil, err } diff --git a/metricbeat/mb/module/runner_test.go b/metricbeat/mb/module/runner_test.go index 888ba006484..9215dfa5494 100644 --- a/metricbeat/mb/module/runner_test.go +++ b/metricbeat/mb/module/runner_test.go @@ -26,7 +26,7 @@ func TestRunner(t *testing.T) { } // Create a new Wrapper based on the configuration. - m, err := module.NewWrapper(config, mb.Registry) + m, err := module.NewWrapper(0, config, mb.Registry) if err != nil { t.Fatal(err) } diff --git a/metricbeat/mb/module/wrapper.go b/metricbeat/mb/module/wrapper.go index 184f2ca7fee..750976f4de4 100644 --- a/metricbeat/mb/module/wrapper.go +++ b/metricbeat/mb/module/wrapper.go @@ -2,6 +2,7 @@ package module import ( "fmt" + "math/rand" "sync" "time" @@ -36,9 +37,10 @@ var ( // Use NewWrapper or NewWrappers to construct new Wrappers. type Wrapper struct { mb.Module - filters *processors.Processors - metricSets []*metricSetWrapper // List of pointers to its associated MetricSets. - configHash uint64 + filters *processors.Processors + metricSets []*metricSetWrapper // List of pointers to its associated MetricSets. + configHash uint64 + maxStartDelay time.Duration } // metricSetWrapper contains the MetricSet and the private data associated with @@ -61,8 +63,8 @@ type stats struct { // NewWrapper create a new Module and its associated MetricSets based // on the given configuration. It constructs the supporting filters and stores // them in the Wrapper. -func NewWrapper(moduleConfig *common.Config, r *mb.Register) (*Wrapper, error) { - mws, err := NewWrappers([]*common.Config{moduleConfig}, r) +func NewWrapper(maxStartDelay time.Duration, moduleConfig *common.Config, r *mb.Register) (*Wrapper, error) { + mws, err := NewWrappers(maxStartDelay, []*common.Config{moduleConfig}, r) if err != nil { return nil, err } @@ -77,7 +79,7 @@ func NewWrapper(moduleConfig *common.Config, r *mb.Register) (*Wrapper, error) { // NewWrappers creates new Modules and their associated MetricSets based // on the given configuration. It constructs the supporting filters and stores // them all in a Wrapper. -func NewWrappers(modulesConfig []*common.Config, r *mb.Register) ([]*Wrapper, error) { +func NewWrappers(maxStartDelay time.Duration, modulesConfig []*common.Config, r *mb.Register) ([]*Wrapper, error) { modules, err := mb.NewModules(modulesConfig, r) if err != nil { return nil, err @@ -95,8 +97,9 @@ func NewWrappers(modulesConfig []*common.Config, r *mb.Register) ([]*Wrapper, er } mw := &Wrapper{ - Module: k, - filters: f, + Module: k, + filters: f, + maxStartDelay: maxStartDelay, } wrappers = append(wrappers, mw) @@ -188,6 +191,17 @@ func (msw *metricSetWrapper) run(done <-chan struct{}, out chan<- common.MapStr) defer logp.Recover(fmt.Sprintf("recovered from panic while fetching "+ "'%s/%s' for host '%s'", msw.module.Name(), msw.Name(), msw.Host())) + // Start each metricset randomly over a period of MaxDelayPeriod. + if msw.module.maxStartDelay > 0 { + delay := time.Duration(rand.Int63n(int64(msw.module.maxStartDelay))) + debugf("%v/%v will start after %v", msw.module.Name(), msw.Name(), delay) + select { + case <-done: + return + case <-time.After(delay): + } + } + debugf("Starting %s", msw) defer debugf("Stopped %s", msw) diff --git a/metricbeat/mb/module/wrapper_test.go b/metricbeat/mb/module/wrapper_test.go index cc9326bc67c..182924e1071 100644 --- a/metricbeat/mb/module/wrapper_test.go +++ b/metricbeat/mb/module/wrapper_test.go @@ -121,7 +121,7 @@ func TestWrapperOfEventFetcher(t *testing.T) { "hosts": hosts, }) - m, err := module.NewWrapper(c, newTestRegistry(t)) + m, err := module.NewWrapper(0, c, newTestRegistry(t)) if err != nil { t.Fatal(err) } @@ -154,7 +154,7 @@ func TestWrapperOfReportingFetcher(t *testing.T) { "hosts": hosts, }) - m, err := module.NewWrapper(c, newTestRegistry(t)) + m, err := module.NewWrapper(0, c, newTestRegistry(t)) if err != nil { t.Fatal(err) } @@ -187,7 +187,7 @@ func TestWrapperOfPushMetricSet(t *testing.T) { "hosts": hosts, }) - m, err := module.NewWrapper(c, newTestRegistry(t)) + m, err := module.NewWrapper(0, c, newTestRegistry(t)) if err != nil { t.Fatal(err) } diff --git a/metricbeat/metricbeat.full.yml b/metricbeat/metricbeat.full.yml index b763ea5309a..3559b69f910 100644 --- a/metricbeat/metricbeat.full.yml +++ b/metricbeat/metricbeat.full.yml @@ -22,6 +22,10 @@ metricbeat.config.modules: # Set to true to enable config reloading reload.enabled: false +# Maximum amount of time to randomly delay the start of a metricset. Use 0 to +# disable startup delay. +metricbeat.max_start_delay: 10s + #========================== Modules configuration ============================ metricbeat.modules: diff --git a/metricbeat/tests/system/config/metricbeat.yml.j2 b/metricbeat/tests/system/config/metricbeat.yml.j2 index ec5e49449c8..d9ff62b1943 100644 --- a/metricbeat/tests/system/config/metricbeat.yml.j2 +++ b/metricbeat/tests/system/config/metricbeat.yml.j2 @@ -98,6 +98,9 @@ metricbeat.config.modules: reload.enabled: true {% endif -%} +# Disable random start delay for metricsets. +metricbeat.max_start_delay: 0 + #================================ General ===================================== # The name of the shipper that publishes the network data. It can be used to group