Skip to content

Commit

Permalink
Add random startup delay to each metricset (#4503)
Browse files Browse the repository at this point in the history
Add random startup delay to each metricset to avoid the thundering herd problem. Fixes #4010.
  • Loading branch information
andrewkroh authored and tsg committed Jun 14, 2017
1 parent 284a267 commit 5c0766a
Show file tree
Hide file tree
Showing 12 changed files with 78 additions and 27 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ https://github.com/elastic/beats/compare/v6.0.0-alpha1...master[Check the HEAD d
*Heartbeat*

*Metricbeat*
- Add random startup delay to each metricset to avoid the thundering herd problem. {issue}4010[4010]

*Packetbeat*

Expand Down
4 changes: 4 additions & 0 deletions metricbeat/_meta/common.full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,7 @@ metricbeat.config.modules:

# Set to true to enable config reloading
reload.enabled: false

# Maximum amount of time to randomly delay the start of a metricset. Use 0 to
# disable startup delay.
metricbeat.max_start_delay: 10s
11 changes: 10 additions & 1 deletion metricbeat/beater/config.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,19 @@
package beater

import "github.com/elastic/beats/libbeat/common"
import (
"time"

"github.com/elastic/beats/libbeat/common"
)

// Config is the root of the Metricbeat configuration hierarchy.
type Config struct {
// Modules is a list of module specific configuration data.
Modules []*common.Config `config:"modules"`
ReloadModules *common.Config `config:"config.modules"`
MaxStartDelay time.Duration `config:"max_start_delay"` // Upper bound on the random startup delay for metricsets (use 0 to disable startup delay).
}

var defaultConfig = Config{
MaxStartDelay: 10 * time.Second,
}
11 changes: 4 additions & 7 deletions metricbeat/beater/metricbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,12 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) {
// List all registered modules and metricsets.
logp.Info("%s", mb.Registry.String())

config := Config{}

err := rawConfig.Unpack(&config)
if err != nil {
config := defaultConfig
if err := rawConfig.Unpack(&config); err != nil {
return nil, errors.Wrap(err, "error reading configuration file")
}

modules, err := module.NewWrappers(config.Modules, mb.Registry)
modules, err := module.NewWrappers(config.MaxStartDelay, config.Modules, mb.Registry)
if err != nil {
// Empty config is fine if dynamic config is enabled
if !config.ReloadModules.Enabled() {
Expand All @@ -61,7 +59,6 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) {
// that a single unresponsive host cannot inadvertently block other hosts
// within the same Module and MetricSet from collection.
func (bt *Metricbeat) Run(b *beat.Beat) error {

var wg sync.WaitGroup

for _, m := range bt.modules {
Expand All @@ -78,7 +75,7 @@ func (bt *Metricbeat) Run(b *beat.Beat) error {
if bt.config.ReloadModules.Enabled() {
logp.Beta("feature dynamic configuration reloading is enabled.")
moduleReloader := cfgfile.NewReloader(bt.config.ReloadModules)
factory := module.NewFactory(b.Publisher)
factory := module.NewFactory(bt.config.MaxStartDelay, b.Publisher)

go moduleReloader.Run(factory)
wg.Add(1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,22 @@ metricbeat.modules:
hosts: ["root@tcp(127.0.0.1:3306)/"]
------------------------------------------------------------------------------

==== Metricbeat Options
==== General Options

===== max_start_delay

The maximum random delay to apply to the startup of a metricset. Random delays
ranging from [0, _max_start_delay_) are applied to reduce the thundering herd
effect that can occur if a fleet of machines running Metricbeat are restarted at
the same time. Specifying a value of 0 disables the startup delay. The default
is 10s.

[source,yaml]
----
metricbeat.max_start_delay: 10s
----

==== Module Options

You can specify the following options in the `metricbeat` section of the +{beatname_lc}.yml+ config file. These options
are the same for all modules. Each module may have additional configuration options that are specific to that module.
Expand Down
4 changes: 2 additions & 2 deletions metricbeat/mb/module/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func ExampleWrapper() {
}

// Create a new Wrapper based on the configuration.
m, err := module.NewWrapper(config, mb.Registry)
m, err := module.NewWrapper(0, config, mb.Registry)
if err != nil {
fmt.Println("Error:", err)
return
Expand Down Expand Up @@ -97,7 +97,7 @@ func ExampleRunner() {
}

// Create a new Wrapper based on the configuration.
m, err := module.NewWrapper(config, mb.Registry)
m, err := module.NewWrapper(0, config, mb.Registry)
if err != nil {
return
}
Expand Down
12 changes: 8 additions & 4 deletions metricbeat/mb/module/factory.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package module

import (
"time"

"github.com/elastic/beats/libbeat/cfgfile"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/publisher"
Expand All @@ -9,18 +11,20 @@ import (

// Factory is used to register and reload modules
type Factory struct {
client func() publisher.Client
client func() publisher.Client
maxStartDelay time.Duration
}

// NewFactory creates new Reloader instance for the given config
func NewFactory(p publisher.Publisher) *Factory {
func NewFactory(maxStartDelay time.Duration, p publisher.Publisher) *Factory {
return &Factory{
client: p.Connect,
client: p.Connect,
maxStartDelay: maxStartDelay,
}
}

func (r *Factory) Create(c *common.Config) (cfgfile.Runner, error) {
w, err := NewWrapper(c, mb.Registry)
w, err := NewWrapper(r.maxStartDelay, c, mb.Registry)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion metricbeat/mb/module/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func TestRunner(t *testing.T) {
}

// Create a new Wrapper based on the configuration.
m, err := module.NewWrapper(config, mb.Registry)
m, err := module.NewWrapper(0, config, mb.Registry)
if err != nil {
t.Fatal(err)
}
Expand Down
30 changes: 22 additions & 8 deletions metricbeat/mb/module/wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package module

import (
"fmt"
"math/rand"
"sync"
"time"

Expand Down Expand Up @@ -36,9 +37,10 @@ var (
// Use NewWrapper or NewWrappers to construct new Wrappers.
type Wrapper struct {
mb.Module
filters *processors.Processors
metricSets []*metricSetWrapper // List of pointers to its associated MetricSets.
configHash uint64
filters *processors.Processors
metricSets []*metricSetWrapper // List of pointers to its associated MetricSets.
configHash uint64
maxStartDelay time.Duration
}

// metricSetWrapper contains the MetricSet and the private data associated with
Expand All @@ -61,8 +63,8 @@ type stats struct {
// NewWrapper create a new Module and its associated MetricSets based
// on the given configuration. It constructs the supporting filters and stores
// them in the Wrapper.
func NewWrapper(moduleConfig *common.Config, r *mb.Register) (*Wrapper, error) {
mws, err := NewWrappers([]*common.Config{moduleConfig}, r)
func NewWrapper(maxStartDelay time.Duration, moduleConfig *common.Config, r *mb.Register) (*Wrapper, error) {
mws, err := NewWrappers(maxStartDelay, []*common.Config{moduleConfig}, r)
if err != nil {
return nil, err
}
Expand All @@ -77,7 +79,7 @@ func NewWrapper(moduleConfig *common.Config, r *mb.Register) (*Wrapper, error) {
// NewWrappers creates new Modules and their associated MetricSets based
// on the given configuration. It constructs the supporting filters and stores
// them all in a Wrapper.
func NewWrappers(modulesConfig []*common.Config, r *mb.Register) ([]*Wrapper, error) {
func NewWrappers(maxStartDelay time.Duration, modulesConfig []*common.Config, r *mb.Register) ([]*Wrapper, error) {
modules, err := mb.NewModules(modulesConfig, r)
if err != nil {
return nil, err
Expand All @@ -95,8 +97,9 @@ func NewWrappers(modulesConfig []*common.Config, r *mb.Register) ([]*Wrapper, er
}

mw := &Wrapper{
Module: k,
filters: f,
Module: k,
filters: f,
maxStartDelay: maxStartDelay,
}
wrappers = append(wrappers, mw)

Expand Down Expand Up @@ -188,6 +191,17 @@ func (msw *metricSetWrapper) run(done <-chan struct{}, out chan<- common.MapStr)
defer logp.Recover(fmt.Sprintf("recovered from panic while fetching "+
"'%s/%s' for host '%s'", msw.module.Name(), msw.Name(), msw.Host()))

// Start each metricset randomly over a period of MaxDelayPeriod.
if msw.module.maxStartDelay > 0 {
delay := time.Duration(rand.Int63n(int64(msw.module.maxStartDelay)))
debugf("%v/%v will start after %v", msw.module.Name(), msw.Name(), delay)
select {
case <-done:
return
case <-time.After(delay):
}
}

debugf("Starting %s", msw)
defer debugf("Stopped %s", msw)

Expand Down
6 changes: 3 additions & 3 deletions metricbeat/mb/module/wrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func TestWrapperOfEventFetcher(t *testing.T) {
"hosts": hosts,
})

m, err := module.NewWrapper(c, newTestRegistry(t))
m, err := module.NewWrapper(0, c, newTestRegistry(t))
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -154,7 +154,7 @@ func TestWrapperOfReportingFetcher(t *testing.T) {
"hosts": hosts,
})

m, err := module.NewWrapper(c, newTestRegistry(t))
m, err := module.NewWrapper(0, c, newTestRegistry(t))
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -187,7 +187,7 @@ func TestWrapperOfPushMetricSet(t *testing.T) {
"hosts": hosts,
})

m, err := module.NewWrapper(c, newTestRegistry(t))
m, err := module.NewWrapper(0, c, newTestRegistry(t))
if err != nil {
t.Fatal(err)
}
Expand Down
4 changes: 4 additions & 0 deletions metricbeat/metricbeat.full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ metricbeat.config.modules:
# Set to true to enable config reloading
reload.enabled: false

# Maximum amount of time to randomly delay the start of a metricset. Use 0 to
# disable startup delay.
metricbeat.max_start_delay: 10s

#========================== Modules configuration ============================
metricbeat.modules:

Expand Down
3 changes: 3 additions & 0 deletions metricbeat/tests/system/config/metricbeat.yml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ metricbeat.config.modules:
reload.enabled: true
{% endif -%}

# Disable random start delay for metricsets.
metricbeat.max_start_delay: 0

#================================ General =====================================

# The name of the shipper that publishes the network data. It can be used to group
Expand Down

0 comments on commit 5c0766a

Please sign in to comment.