Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Keep a mapping of autodiscover instances and deployed configs #8851

Merged
merged 10 commits into from
Nov 29, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha1...master[Check the HEAD d

*Affecting all Beats*

- Fix autodiscover configurations stopping when metadata is missing. {pull}8851[8851]

*Auditbeat*

*Filebeat*
Expand Down
77 changes: 47 additions & 30 deletions libbeat/autodiscover/autodiscover.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ type Autodiscover struct {
defaultPipeline beat.Pipeline
adapter Adapter
providers []Provider
configs map[uint64]*reload.ConfigWithMeta
configs map[string]map[uint64]*reload.ConfigWithMeta
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My reading of this is that these IDs must be globally unique regardless of provider. This could be an issue if two providers use the same key.

Maybe we could modify getId to prepend the provider name? That way providers would only be responsible for generating IDs unique to themselves.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about it further, it's probably best just to add another level of map, using a synthetic key based on the provider. You can have multiple instances of a provider, and it probably makes sense to have the keys scoped per instance.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Autodiscover doesn't know about the provider, as everything is get from the same bus, I'm inclined to leave this as it is, as I don't expect this kind of configurations? Even if the user configures several providers, stop events would still be consistent?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm worried because there's no explicit contract here that external providers would have IDs that would never clash. I don't think any of the providers we use today have keys that clash, but it seems like we're creating an extra failure mode by not doing this namespacing.

Why wouldn't we enforce that sort of thing where we can? It seems like tempting fate not to. I also think it would make the workings of this code more clear. Encoding our understanding of the domain into the datastructures increases code readability.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just pushed the changes to include unique provider as part of the mapping 👍

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The changes look good!

runners *cfgfile.RunnerList
meta *meta.Map

Expand All @@ -88,7 +88,7 @@ func NewAutodiscover(name string, pipeline beat.Pipeline, adapter Adapter, confi
bus: bus,
defaultPipeline: pipeline,
adapter: adapter,
configs: map[uint64]*reload.ConfigWithMeta{},
configs: map[string]map[uint64]*reload.ConfigWithMeta{},
runners: cfgfile.NewRunnerList("autodiscover", adapter, pipeline),
providers: providers,
meta: meta.NewMap(),
Expand Down Expand Up @@ -142,9 +142,11 @@ func (a *Autodiscover) worker() {
logp.Debug(debugK, "Reloading existing autodiscover configs after error")
}

configs := make([]*reload.ConfigWithMeta, 0, len(a.configs))
for _, c := range a.configs {
configs = append(configs, c)
configs := []*reload.ConfigWithMeta{}
for _, list := range a.configs {
for _, c := range list {
configs = append(configs, c)
}
jsoriano marked this conversation as resolved.
Show resolved Hide resolved
}

err := a.runners.Reload(configs)
Expand All @@ -160,12 +162,25 @@ func (a *Autodiscover) worker() {
func (a *Autodiscover) handleStart(event bus.Event) bool {
var updated bool

logp.Debug(debugK, "Got a start event: %v", event)

eventID := getID(event)
if eventID == "" {
logp.Err("Event didn't provide instance id: %+v, ignoring it", event)
return false
}

// Ensure configs list exists for this instance
if _, ok := a.configs[eventID]; !ok {
a.configs[eventID] = map[uint64]*reload.ConfigWithMeta{}
}

configs, err := a.adapter.CreateConfig(event)
if err != nil {
logp.Debug(debugK, "Could not generate config from event %v: %v", event, err)
return false
}
logp.Debug(debugK, "Got a start event: %v, generated configs: %+v", event, configs)
logp.Debug(debugK, "Generated configs: %+v", configs)

meta := getMeta(event)
for _, config := range configs {
Expand All @@ -184,12 +199,12 @@ func (a *Autodiscover) handleStart(event bus.Event) bool {
// Update meta no matter what
dynFields := a.meta.Store(hash, meta)

if a.runners.Has(hash) {
if a.configs[eventID][hash] != nil {
logp.Debug(debugK, "Config %v is already running", config)
continue
}

a.configs[hash] = &reload.ConfigWithMeta{
a.configs[eventID][hash] = &reload.ConfigWithMeta{
andrewvc marked this conversation as resolved.
Show resolved Hide resolved
Config: config,
Meta: &dynFields,
}
Expand All @@ -202,33 +217,20 @@ func (a *Autodiscover) handleStart(event bus.Event) bool {
func (a *Autodiscover) handleStop(event bus.Event) bool {
var updated bool

configs, err := a.adapter.CreateConfig(event)
if err != nil {
logp.Debug(debugK, "Could not generate config from event %v: %v", event, err)
logp.Debug(debugK, "Got a stop event: %v", event)
eventID := getID(event)
if eventID == "" {
logp.Err("Event didn't provide instance id: %+v, ignoring it", event)
return false
}
logp.Debug(debugK, "Got a stop event: %v, generated configs: %+v", event, configs)

for _, config := range configs {
hash, err := cfgfile.HashConfig(config)
if err != nil {
logp.Debug(debugK, "Could not hash config %v: %v", config, err)
continue
}

if !a.runners.Has(hash) {
logp.Debug(debugK, "Config %v is not running", config)
continue
}

if a.runners.Has(hash) {
delete(a.configs, hash)
updated = true
} else {
logp.Debug(debugK, "Runner not found for stopping: %d", hash)
}
if len(a.configs[eventID]) > 0 {
logp.Debug(debugK, "Stopping %d configs", len(a.configs[eventID]))
updated = true
}

delete(a.configs, eventID)

return updated
}

Expand All @@ -247,6 +249,21 @@ func getMeta(event bus.Event) common.MapStr {
return meta
}

// getID returns the event "id" field string if present
func getID(e bus.Event) string {
provider, ok := e["provider"]
if !ok {
return ""
}

id, ok := e["id"]
if !ok {
return ""
}

return fmt.Sprintf("%s:%s", provider, id)
}

// Stop autodiscover process
func (a *Autodiscover) Stop() {
if a == nil {
Expand Down
47 changes: 31 additions & 16 deletions libbeat/autodiscover/autodiscover_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"testing"
"time"

"github.com/gofrs/uuid"
"github.com/stretchr/testify/assert"

"github.com/elastic/beats/libbeat/beat"
Expand Down Expand Up @@ -137,7 +138,7 @@ func TestAutodiscover(t *testing.T) {
// Register mock autodiscover provider
busChan := make(chan bus.Bus, 1)
Registry = NewRegistry()
Registry.AddProvider("mock", func(b bus.Bus, c *common.Config) (Provider, error) {
Registry.AddProvider("mock", func(b bus.Bus, uuid uuid.UUID, c *common.Config) (Provider, error) {
// intercept bus to mock events
busChan <- b

Expand Down Expand Up @@ -173,7 +174,9 @@ func TestAutodiscover(t *testing.T) {

// Test start event
eventBus.Publish(bus.Event{
"start": true,
"id": "foo",
"provider": "mock",
"start": true,
"meta": common.MapStr{
"foo": "bar",
},
Expand All @@ -182,14 +185,16 @@ func TestAutodiscover(t *testing.T) {

runners := adapter.Runners()
assert.Equal(t, len(runners), 1)
assert.Equal(t, len(autodiscover.configs), 1)
assert.Equal(t, len(autodiscover.configs["mock:foo"]), 1)
assert.Equal(t, runners[0].meta.Get()["foo"], "bar")
assert.True(t, runners[0].started)
assert.False(t, runners[0].stopped)

// Test update
eventBus.Publish(bus.Event{
"start": true,
"id": "foo",
"provider": "mock",
"start": true,
"meta": common.MapStr{
"foo": "baz",
},
Expand All @@ -198,20 +203,24 @@ func TestAutodiscover(t *testing.T) {

runners = adapter.Runners()
assert.Equal(t, len(runners), 1)
assert.Equal(t, len(autodiscover.configs), 1)
assert.Equal(t, len(autodiscover.configs["mock:foo"]), 1)
assert.Equal(t, runners[0].meta.Get()["foo"], "baz") // meta is updated
assert.True(t, runners[0].started)
assert.False(t, runners[0].stopped)

// Test stop/start
eventBus.Publish(bus.Event{
"stop": true,
"id": "foo",
"provider": "mock",
"stop": true,
"meta": common.MapStr{
"foo": "baz",
},
})
eventBus.Publish(bus.Event{
"start": true,
"id": "foo",
"provider": "mock",
"start": true,
"meta": common.MapStr{
"foo": "baz",
},
Expand All @@ -220,15 +229,17 @@ func TestAutodiscover(t *testing.T) {

runners = adapter.Runners()
assert.Equal(t, len(runners), 2)
assert.Equal(t, len(autodiscover.configs), 1)
assert.Equal(t, len(autodiscover.configs["mock:foo"]), 1)
assert.True(t, runners[0].stopped)
assert.Equal(t, runners[1].meta.Get()["foo"], "baz")
assert.True(t, runners[1].started)
assert.False(t, runners[1].stopped)

// Test stop event
eventBus.Publish(bus.Event{
"stop": true,
"id": "foo",
"provider": "mock",
"stop": true,
"meta": common.MapStr{
"foo": "baz",
},
Expand All @@ -237,7 +248,7 @@ func TestAutodiscover(t *testing.T) {

runners = adapter.Runners()
assert.Equal(t, len(runners), 2)
assert.Equal(t, len(autodiscover.configs), 0)
assert.Equal(t, len(autodiscover.configs["mock:foo"]), 0)
assert.Equal(t, runners[1].meta.Get()["foo"], "baz")
assert.True(t, runners[1].started)
assert.True(t, runners[1].stopped)
Expand All @@ -248,7 +259,7 @@ func TestAutodiscoverHash(t *testing.T) {
busChan := make(chan bus.Bus, 1)

Registry = NewRegistry()
Registry.AddProvider("mock", func(b bus.Bus, c *common.Config) (Provider, error) {
Registry.AddProvider("mock", func(b bus.Bus, uuid uuid.UUID, c *common.Config) (Provider, error) {
// intercept bus to mock events
busChan <- b

Expand Down Expand Up @@ -287,7 +298,9 @@ func TestAutodiscoverHash(t *testing.T) {

// Test start event
eventBus.Publish(bus.Event{
"start": true,
"id": "foo",
"provider": "mock",
"start": true,
"meta": common.MapStr{
"foo": "bar",
},
Expand All @@ -296,7 +309,7 @@ func TestAutodiscoverHash(t *testing.T) {

runners := adapter.Runners()
assert.Equal(t, len(runners), 2)
assert.Equal(t, len(autodiscover.configs), 2)
assert.Equal(t, len(autodiscover.configs["mock:foo"]), 2)
assert.Equal(t, runners[0].meta.Get()["foo"], "bar")
assert.True(t, runners[0].started)
assert.False(t, runners[0].stopped)
Expand All @@ -309,7 +322,7 @@ func TestAutodiscoverWithConfigCheckFailures(t *testing.T) {
// Register mock autodiscover provider
busChan := make(chan bus.Bus, 1)
Registry = NewRegistry()
Registry.AddProvider("mock", func(b bus.Bus, c *common.Config) (Provider, error) {
Registry.AddProvider("mock", func(b bus.Bus, uuid uuid.UUID, c *common.Config) (Provider, error) {
// intercept bus to mock events
busChan <- b

Expand Down Expand Up @@ -348,15 +361,17 @@ func TestAutodiscoverWithConfigCheckFailures(t *testing.T) {

// Test start event
eventBus.Publish(bus.Event{
"start": true,
"id": "foo",
"provider": "mock",
"start": true,
"meta": common.MapStr{
"foo": "bar",
},
})

// As only the second config is valid, total runners will be 1
wait(t, func() bool { return len(adapter.Runners()) == 1 })
assert.Equal(t, 1, len(autodiscover.configs))
assert.Equal(t, 1, len(autodiscover.configs["mock:foo"]))
}

func wait(t *testing.T, test func() bool) {
Expand Down
11 changes: 9 additions & 2 deletions libbeat/autodiscover/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"fmt"
"strings"

"github.com/gofrs/uuid"

"github.com/elastic/beats/libbeat/cfgfile"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/bus"
Expand All @@ -33,7 +35,7 @@ type Provider interface {
}

// ProviderBuilder creates a new provider based on the given config and returns it
type ProviderBuilder func(bus.Bus, *common.Config) (Provider, error)
type ProviderBuilder func(bus.Bus, uuid.UUID, *common.Config) (Provider, error)

// AddProvider registers a new ProviderBuilder
func (r *registry) AddProvider(name string, provider ProviderBuilder) error {
Expand Down Expand Up @@ -80,5 +82,10 @@ func (r *registry) BuildProvider(bus bus.Bus, c *common.Config) (Provider, error
return nil, fmt.Errorf("Unknown autodiscover provider %s", config.Type)
}

return builder(bus, c)
uuid, err := uuid.NewV4()
if err != nil {
return nil, err
}

return builder(bus, uuid, c)
}
Loading