Skip to content

Commit

Permalink
Keep a mapping of autodiscover instances and deployed configs (elasti…
Browse files Browse the repository at this point in the history
…c#8851)

* Keep a mapping of autodiscover instances and deployed configs

This changes stop behavior, sometimes not all metadata is available in
the stop event, this results in some configs not being stopped because
of that.

After this change, we only rely on instance id from the event to decide
what configs to delete, as we'll keep a map of instance id -> deployed
configs.

(cherry picked from commit 571b008)
  • Loading branch information
exekias authored and Carlos Pérez-Aradros Herce committed Nov 29, 2018
1 parent 277a767 commit 5096e4f
Show file tree
Hide file tree
Showing 11 changed files with 178 additions and 93 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ https://github.com/elastic/beats/compare/v6.5.0...6.x[Check the HEAD diff]
*Affecting all Beats*

- Propagate Sync error when running SafeFileRotate. {pull}9069[9069]
- Fix autodiscover configurations stopping when metadata is missing. {pull}8851[8851]

*Auditbeat*

Expand Down
78 changes: 48 additions & 30 deletions libbeat/autodiscover/autodiscover.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package autodiscover

import (
"fmt"
"time"

"github.com/pkg/errors"
Expand Down Expand Up @@ -60,7 +61,7 @@ type Autodiscover struct {
defaultPipeline beat.Pipeline
adapter Adapter
providers []Provider
configs map[uint64]*reload.ConfigWithMeta
configs map[string]map[uint64]*reload.ConfigWithMeta
runners *cfgfile.RunnerList
meta *meta.Map

Expand All @@ -87,7 +88,7 @@ func NewAutodiscover(name string, pipeline beat.Pipeline, adapter Adapter, confi
bus: bus,
defaultPipeline: pipeline,
adapter: adapter,
configs: map[uint64]*reload.ConfigWithMeta{},
configs: map[string]map[uint64]*reload.ConfigWithMeta{},
runners: cfgfile.NewRunnerList("autodiscover", adapter, pipeline),
providers: providers,
meta: meta.NewMap(),
Expand Down Expand Up @@ -136,9 +137,11 @@ func (a *Autodiscover) worker() {
logp.Debug(debugK, "Reloading existing autodiscover configs after error")
}

configs := make([]*reload.ConfigWithMeta, 0, len(a.configs))
for _, c := range a.configs {
configs = append(configs, c)
configs := []*reload.ConfigWithMeta{}
for _, list := range a.configs {
for _, c := range list {
configs = append(configs, c)
}
}

err := a.runners.Reload(configs)
Expand All @@ -154,12 +157,25 @@ func (a *Autodiscover) worker() {
func (a *Autodiscover) handleStart(event bus.Event) bool {
var updated bool

logp.Debug(debugK, "Got a start event: %v", event)

eventID := getID(event)
if eventID == "" {
logp.Err("Event didn't provide instance id: %+v, ignoring it", event)
return false
}

// Ensure configs list exists for this instance
if _, ok := a.configs[eventID]; !ok {
a.configs[eventID] = map[uint64]*reload.ConfigWithMeta{}
}

configs, err := a.adapter.CreateConfig(event)
if err != nil {
logp.Debug(debugK, "Could not generate config from event %v: %v", event, err)
return false
}
logp.Debug(debugK, "Got a start event: %v, generated configs: %+v", event, configs)
logp.Debug(debugK, "Generated configs: %+v", configs)

meta := getMeta(event)
for _, config := range configs {
Expand All @@ -178,12 +194,12 @@ func (a *Autodiscover) handleStart(event bus.Event) bool {
// Update meta no matter what
dynFields := a.meta.Store(hash, meta)

if a.runners.Has(hash) {
if a.configs[eventID][hash] != nil {
logp.Debug(debugK, "Config %v is already running", config)
continue
}

a.configs[hash] = &reload.ConfigWithMeta{
a.configs[eventID][hash] = &reload.ConfigWithMeta{
Config: config,
Meta: &dynFields,
}
Expand All @@ -196,33 +212,20 @@ func (a *Autodiscover) handleStart(event bus.Event) bool {
func (a *Autodiscover) handleStop(event bus.Event) bool {
var updated bool

configs, err := a.adapter.CreateConfig(event)
if err != nil {
logp.Debug(debugK, "Could not generate config from event %v: %v", event, err)
logp.Debug(debugK, "Got a stop event: %v", event)
eventID := getID(event)
if eventID == "" {
logp.Err("Event didn't provide instance id: %+v, ignoring it", event)
return false
}
logp.Debug(debugK, "Got a stop event: %v, generated configs: %+v", event, configs)

for _, config := range configs {
hash, err := cfgfile.HashConfig(config)
if err != nil {
logp.Debug(debugK, "Could not hash config %v: %v", config, err)
continue
}

if !a.runners.Has(hash) {
logp.Debug(debugK, "Config %v is not running", config)
continue
}

if a.runners.Has(hash) {
delete(a.configs, hash)
updated = true
} else {
logp.Debug(debugK, "Runner not found for stopping: %s", hash)
}
if len(a.configs[eventID]) > 0 {
logp.Debug(debugK, "Stopping %d configs", len(a.configs[eventID]))
updated = true
}

delete(a.configs, eventID)

return updated
}

Expand All @@ -241,6 +244,21 @@ func getMeta(event bus.Event) common.MapStr {
return meta
}

// getID returns the event "id" field string if present
func getID(e bus.Event) string {
provider, ok := e["provider"]
if !ok {
return ""
}

id, ok := e["id"]
if !ok {
return ""
}

return fmt.Sprintf("%s:%s", provider, id)
}

// Stop autodiscover process
func (a *Autodiscover) Stop() {
if a == nil {
Expand Down
47 changes: 31 additions & 16 deletions libbeat/autodiscover/autodiscover_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"testing"
"time"

"github.com/gofrs/uuid"
"github.com/stretchr/testify/assert"

"github.com/elastic/beats/libbeat/beat"
Expand Down Expand Up @@ -137,7 +138,7 @@ func TestAutodiscover(t *testing.T) {
// Register mock autodiscover provider
busChan := make(chan bus.Bus, 1)
Registry = NewRegistry()
Registry.AddProvider("mock", func(b bus.Bus, c *common.Config) (Provider, error) {
Registry.AddProvider("mock", func(b bus.Bus, uuid uuid.UUID, c *common.Config) (Provider, error) {
// intercept bus to mock events
busChan <- b

Expand Down Expand Up @@ -173,7 +174,9 @@ func TestAutodiscover(t *testing.T) {

// Test start event
eventBus.Publish(bus.Event{
"start": true,
"id": "foo",
"provider": "mock",
"start": true,
"meta": common.MapStr{
"foo": "bar",
},
Expand All @@ -182,14 +185,16 @@ func TestAutodiscover(t *testing.T) {

runners := adapter.Runners()
assert.Equal(t, len(runners), 1)
assert.Equal(t, len(autodiscover.configs), 1)
assert.Equal(t, len(autodiscover.configs["mock:foo"]), 1)
assert.Equal(t, runners[0].meta.Get()["foo"], "bar")
assert.True(t, runners[0].started)
assert.False(t, runners[0].stopped)

// Test update
eventBus.Publish(bus.Event{
"start": true,
"id": "foo",
"provider": "mock",
"start": true,
"meta": common.MapStr{
"foo": "baz",
},
Expand All @@ -198,20 +203,24 @@ func TestAutodiscover(t *testing.T) {

runners = adapter.Runners()
assert.Equal(t, len(runners), 1)
assert.Equal(t, len(autodiscover.configs), 1)
assert.Equal(t, len(autodiscover.configs["mock:foo"]), 1)
assert.Equal(t, runners[0].meta.Get()["foo"], "baz") // meta is updated
assert.True(t, runners[0].started)
assert.False(t, runners[0].stopped)

// Test stop/start
eventBus.Publish(bus.Event{
"stop": true,
"id": "foo",
"provider": "mock",
"stop": true,
"meta": common.MapStr{
"foo": "baz",
},
})
eventBus.Publish(bus.Event{
"start": true,
"id": "foo",
"provider": "mock",
"start": true,
"meta": common.MapStr{
"foo": "baz",
},
Expand All @@ -220,15 +229,17 @@ func TestAutodiscover(t *testing.T) {

runners = adapter.Runners()
assert.Equal(t, len(runners), 2)
assert.Equal(t, len(autodiscover.configs), 1)
assert.Equal(t, len(autodiscover.configs["mock:foo"]), 1)
assert.True(t, runners[0].stopped)
assert.Equal(t, runners[1].meta.Get()["foo"], "baz")
assert.True(t, runners[1].started)
assert.False(t, runners[1].stopped)

// Test stop event
eventBus.Publish(bus.Event{
"stop": true,
"id": "foo",
"provider": "mock",
"stop": true,
"meta": common.MapStr{
"foo": "baz",
},
Expand All @@ -237,7 +248,7 @@ func TestAutodiscover(t *testing.T) {

runners = adapter.Runners()
assert.Equal(t, len(runners), 2)
assert.Equal(t, len(autodiscover.configs), 0)
assert.Equal(t, len(autodiscover.configs["mock:foo"]), 0)
assert.Equal(t, runners[1].meta.Get()["foo"], "baz")
assert.True(t, runners[1].started)
assert.True(t, runners[1].stopped)
Expand All @@ -248,7 +259,7 @@ func TestAutodiscoverHash(t *testing.T) {
busChan := make(chan bus.Bus, 1)

Registry = NewRegistry()
Registry.AddProvider("mock", func(b bus.Bus, c *common.Config) (Provider, error) {
Registry.AddProvider("mock", func(b bus.Bus, uuid uuid.UUID, c *common.Config) (Provider, error) {
// intercept bus to mock events
busChan <- b

Expand Down Expand Up @@ -287,7 +298,9 @@ func TestAutodiscoverHash(t *testing.T) {

// Test start event
eventBus.Publish(bus.Event{
"start": true,
"id": "foo",
"provider": "mock",
"start": true,
"meta": common.MapStr{
"foo": "bar",
},
Expand All @@ -296,7 +309,7 @@ func TestAutodiscoverHash(t *testing.T) {

runners := adapter.Runners()
assert.Equal(t, len(runners), 2)
assert.Equal(t, len(autodiscover.configs), 2)
assert.Equal(t, len(autodiscover.configs["mock:foo"]), 2)
assert.Equal(t, runners[0].meta.Get()["foo"], "bar")
assert.True(t, runners[0].started)
assert.False(t, runners[0].stopped)
Expand All @@ -309,7 +322,7 @@ func TestAutodiscoverWithConfigCheckFailures(t *testing.T) {
// Register mock autodiscover provider
busChan := make(chan bus.Bus, 1)
Registry = NewRegistry()
Registry.AddProvider("mock", func(b bus.Bus, c *common.Config) (Provider, error) {
Registry.AddProvider("mock", func(b bus.Bus, uuid uuid.UUID, c *common.Config) (Provider, error) {
// intercept bus to mock events
busChan <- b

Expand Down Expand Up @@ -348,15 +361,17 @@ func TestAutodiscoverWithConfigCheckFailures(t *testing.T) {

// Test start event
eventBus.Publish(bus.Event{
"start": true,
"id": "foo",
"provider": "mock",
"start": true,
"meta": common.MapStr{
"foo": "bar",
},
})

// As only the second config is valid, total runners will be 1
wait(t, func() bool { return len(adapter.Runners()) == 1 })
assert.Equal(t, 1, len(autodiscover.configs))
assert.Equal(t, 1, len(autodiscover.configs["mock:foo"]))
}

func wait(t *testing.T, test func() bool) {
Expand Down
11 changes: 9 additions & 2 deletions libbeat/autodiscover/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"fmt"
"strings"

"github.com/gofrs/uuid"

"github.com/elastic/beats/libbeat/cfgfile"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/bus"
Expand All @@ -33,7 +35,7 @@ type Provider interface {
}

// ProviderBuilder creates a new provider based on the given config and returns it
type ProviderBuilder func(bus.Bus, *common.Config) (Provider, error)
type ProviderBuilder func(bus.Bus, uuid.UUID, *common.Config) (Provider, error)

// AddProvider registers a new ProviderBuilder
func (r *registry) AddProvider(name string, provider ProviderBuilder) error {
Expand Down Expand Up @@ -80,5 +82,10 @@ func (r *registry) BuildProvider(bus bus.Bus, c *common.Config) (Provider, error
return nil, fmt.Errorf("Unknown autodiscover provider %s", config.Type)
}

return builder(bus, c)
uuid, err := uuid.NewV4()
if err != nil {
return nil, err
}

return builder(bus, uuid, c)
}
Loading

0 comments on commit 5096e4f

Please sign in to comment.