Skip to content

Commit

Permalink
Cherry-pick elastic#7961 to 6.4: Implement CheckConfig in RunnerFacto…
Browse files Browse the repository at this point in the history
…ry to make autodiscover check configs (elastic#8113)

Cherry-pick of PR elastic#7961 to 6.4 branch. Original message: 

After `autodiscover` was implemented to retry failed configs, it began to retry spinning up configs every 10s. This is great for configs that are actually valid as it might be a pipeline related issue. Configs that say dont have `hosts` on it will always fail. In such scenarios, we should check the validity of the config and if it is invalid, it should not be retried.

This PR, implements the missing `CheckConfig` API by moving it from `Adapter` interface into the `RunnerFactory` interface. It is implemented only for metricbeat right now. It needs to be enhanced to Filebeat once the Filebeat refactor is complete as there is a known memory leak on trying to simply trying to create an input and not start/stop it. 

In future, each module/input can expose an API which validates a config instead of trying to call the constructor which is not optimal.
  • Loading branch information
exekias authored and ruflin committed Aug 28, 2018
1 parent 13cb6ad commit cd40b48
Show file tree
Hide file tree
Showing 10 changed files with 104 additions and 8 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ https://github.com/elastic/beats/compare/v6.4.0...6.4[Check the HEAD diff]
- Allow for cloud-id to specify a custom port. This makes cloud-id work in ECE contexts. {pull}7887[7887]
- Add support to grow or shrink an existing spool file between restarts. {pull}7859[7859]
- Make kubernetes autodiscover ignore events with empty container IDs {pull}7971[7971]
- Implement CheckConfig in RunnerFactory to make autodiscover check configs {pull}7961[7961]

*Auditbeat*

Expand Down
6 changes: 4 additions & 2 deletions filebeat/autodiscover/autodiscover.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,10 @@ func (m *AutodiscoverAdapter) CreateConfig(e bus.Event) ([]*common.Config, error

// CheckConfig tests given config to check if it will work or not, returns errors in case it won't work
func (m *AutodiscoverAdapter) CheckConfig(c *common.Config) error {
// TODO implement config check for all modules
return nil
if c.HasField("module") {
return m.moduleFactory.CheckConfig(c)
}
return m.inputFactory.CheckConfig(c)
}

// Create a module or input from the given config
Expand Down
6 changes: 6 additions & 0 deletions filebeat/fileset/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,12 @@ func (f *Factory) Create(p beat.Pipeline, c *common.Config, meta *common.MapStrP
}, nil
}

// CheckConfig checks if a config is valid or not
func (f *Factory) CheckConfig(config *common.Config) error {
// TODO: add code here once we know that spinning up a filebeat input to check for errors doesn't cause memory leaks.
return nil
}

func (p *inputsRunner) Start() {
// Load pipelines
if p.pipelineLoaderFactory != nil {
Expand Down
6 changes: 6 additions & 0 deletions filebeat/input/runnerfactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,9 @@ func (r *RunnerFactory) Create(

return p, nil
}

// CheckConfig checks if a config is valid or not
func (r *RunnerFactory) CheckConfig(config *common.Config) error {
// TODO: add code here once we know that spinning up a filebeat input to check for errors doesn't cause memory leaks.
return nil
}
3 changes: 0 additions & 3 deletions libbeat/autodiscover/autodiscover.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,6 @@ type Adapter interface {
// CreateConfig generates a valid list of configs from the given event, the received event will have all keys defined by `StartFilter`
CreateConfig(bus.Event) ([]*common.Config, error)

// CheckConfig tests given config to check if it will work or not, returns errors in case it won't work
CheckConfig(*common.Config) error

// RunnerFactory provides runner creation by feeding valid configs
cfgfile.RunnerFactory

Expand Down
72 changes: 71 additions & 1 deletion libbeat/autodiscover/autodiscover_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package autodiscover

import (
"fmt"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -75,7 +76,17 @@ func (m *mockAdapter) CreateConfig(bus.Event) ([]*common.Config, error) {
}

// CheckConfig tests given config to check if it will work or not, returns errors in case it won't work
func (m *mockAdapter) CheckConfig(*common.Config) error {
func (m *mockAdapter) CheckConfig(c *common.Config) error {
config := struct {
Broken bool `config:"broken"`
}{}
c.Unpack(&config)

if config.Broken {
fmt.Println("broken")
return fmt.Errorf("Broken config")
}

return nil
}

Expand Down Expand Up @@ -171,6 +182,7 @@ func TestAutodiscover(t *testing.T) {

runners := adapter.Runners()
assert.Equal(t, len(runners), 1)
assert.Equal(t, len(autodiscover.configs), 1)
assert.Equal(t, runners[0].meta.Get()["foo"], "bar")
assert.True(t, runners[0].started)
assert.False(t, runners[0].stopped)
Expand All @@ -186,6 +198,7 @@ func TestAutodiscover(t *testing.T) {

runners = adapter.Runners()
assert.Equal(t, len(runners), 1)
assert.Equal(t, len(autodiscover.configs), 1)
assert.Equal(t, runners[0].meta.Get()["foo"], "baz") // meta is updated
assert.True(t, runners[0].started)
assert.False(t, runners[0].stopped)
Expand All @@ -207,6 +220,7 @@ func TestAutodiscover(t *testing.T) {

runners = adapter.Runners()
assert.Equal(t, len(runners), 2)
assert.Equal(t, len(autodiscover.configs), 1)
assert.True(t, runners[0].stopped)
assert.Equal(t, runners[1].meta.Get()["foo"], "baz")
assert.True(t, runners[1].started)
Expand All @@ -223,6 +237,7 @@ func TestAutodiscover(t *testing.T) {

runners = adapter.Runners()
assert.Equal(t, len(runners), 2)
assert.Equal(t, len(autodiscover.configs), 0)
assert.Equal(t, runners[1].meta.Get()["foo"], "baz")
assert.True(t, runners[1].started)
assert.True(t, runners[1].stopped)
Expand Down Expand Up @@ -281,6 +296,7 @@ func TestAutodiscoverHash(t *testing.T) {

runners := adapter.Runners()
assert.Equal(t, len(runners), 2)
assert.Equal(t, len(autodiscover.configs), 2)
assert.Equal(t, runners[0].meta.Get()["foo"], "bar")
assert.True(t, runners[0].started)
assert.False(t, runners[0].stopped)
Expand All @@ -289,6 +305,60 @@ func TestAutodiscoverHash(t *testing.T) {
assert.False(t, runners[1].stopped)
}

func TestAutodiscoverWithConfigCheckFailures(t *testing.T) {
// Register mock autodiscover provider
busChan := make(chan bus.Bus, 1)
Registry = NewRegistry()
Registry.AddProvider("mock", func(b bus.Bus, c *common.Config) (Provider, error) {
// intercept bus to mock events
busChan <- b

return &mockProvider{}, nil
})

// Create a mock adapter
runnerConfig1, _ := common.NewConfigFrom(map[string]string{
"broken": "true",
})
runnerConfig2, _ := common.NewConfigFrom(map[string]string{
"runner": "2",
})
adapter := mockAdapter{
configs: []*common.Config{runnerConfig1, runnerConfig2},
}

// and settings:
providerConfig, _ := common.NewConfigFrom(map[string]string{
"type": "mock",
})
config := Config{
Providers: []*common.Config{providerConfig},
}

// Create autodiscover manager
autodiscover, err := NewAutodiscover("test", nil, &adapter, &config)
if err != nil {
t.Fatal(err)
}

// Start it
autodiscover.Start()
defer autodiscover.Stop()
eventBus := <-busChan

// Test start event
eventBus.Publish(bus.Event{
"start": true,
"meta": common.MapStr{
"foo": "bar",
},
})

// As only the second config is valid, total runners will be 1
wait(t, func() bool { return len(adapter.Runners()) == 1 })
assert.Equal(t, 1, len(autodiscover.configs))
}

func wait(t *testing.T, test func() bool) {
sleep := 20 * time.Millisecond
ready := test()
Expand Down
4 changes: 4 additions & 0 deletions libbeat/cfgfile/list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ func (r *runnerFactory) Create(x beat.Pipeline, c *common.Config, meta *common.M
return runner, err
}

func (r *runnerFactory) CheckConfig(config *common.Config) error {
return nil
}

func TestNewConfigs(t *testing.T) {
factory := &runnerFactory{}
list := NewRunnerList("", factory, nil)
Expand Down
1 change: 1 addition & 0 deletions libbeat/cfgfile/reload.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ type Reload struct {

type RunnerFactory interface {
Create(p beat.Pipeline, config *common.Config, meta *common.MapStrPointer) (Runner, error)
CheckConfig(config *common.Config) error
}

type Runner interface {
Expand Down
3 changes: 1 addition & 2 deletions metricbeat/autodiscover/autodiscover.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,7 @@ func (m *AutodiscoverAdapter) CreateConfig(e bus.Event) ([]*common.Config, error

// CheckConfig tests given config to check if it will work or not, returns errors in case it won't work
func (m *AutodiscoverAdapter) CheckConfig(c *common.Config) error {
// TODO implement config check for all modules
return nil
return m.factory.CheckConfig(c)
}

// Create a module or prospector from the given config
Expand Down
10 changes: 10 additions & 0 deletions metricbeat/mb/module/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,13 @@ func (r *Factory) Create(p beat.Pipeline, c *common.Config, meta *common.MapStrP
mr := NewRunner(client, w)
return mr, nil
}

// CheckConfig checks if a config is valid or not
func (r *Factory) CheckConfig(config *common.Config) error {
_, err := NewWrapper(config, mb.Registry, r.options...)
if err != nil {
return err
}

return nil
}

0 comments on commit cd40b48

Please sign in to comment.