Skip to content

Commit

Permalink
Add builder support for autodiscover and annotations builder (#6408)
Browse files Browse the repository at this point in the history
* Add builder support for autodiscover and annotations builder
  • Loading branch information
vjsamuel authored and exekias committed Feb 23, 2018
1 parent 70efaf7 commit 326c68f
Show file tree
Hide file tree
Showing 32 changed files with 1,428 additions and 108 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ https://github.com/elastic/beats/compare/v6.0.0-beta2...master[Check the HEAD di
- The `add_docker_metadata` and `add_kubernetes_metadata` processors are now GA, instead of Beta. {pull}6105[6105]
- The node name can be discovered automatically by machine-id matching when beat deployed outside kubernetes cluster. {pull}6146[6146]
- Panics will be written to the logger before exiting. {pull}6199[6199]
- Add builder support for autodiscover and annotations builder {pull}6408[6408]

*Auditbeat*

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package beater
package autodiscover

import (
"errors"
Expand Down
24 changes: 24 additions & 0 deletions filebeat/autodiscover/builder/logs/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package logs

import "github.com/elastic/beats/libbeat/common"

type config struct {
Key string `config:"key"`
Config []*common.Config `config:"config"`
}

func defaultConfig() config {
rawCfg := map[string]interface{}{
"type": "docker",
"containers": map[string]interface{}{
"ids": []string{
"${data.docker.container.id}",
},
},
}
cfg, _ := common.NewConfigFrom(rawCfg)
return config{
Key: "logs",
Config: []*common.Config{cfg},
}
}
99 changes: 99 additions & 0 deletions filebeat/autodiscover/builder/logs/logs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package logs

import (
"fmt"

"github.com/elastic/beats/libbeat/autodiscover"
"github.com/elastic/beats/libbeat/autodiscover/builder"
"github.com/elastic/beats/libbeat/autodiscover/template"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/bus"
"github.com/elastic/beats/libbeat/logp"
)

func init() {
autodiscover.Registry.AddBuilder("logs", NewLogAnnotations)
}

const (
multiline = "multiline"
includeLines = "include_lines"
excludeLines = "exclude_lines"
)

type logAnnotations struct {
Key string
Config []*common.Config
}

// NewLogAnnotations builds a log annotations builder
func NewLogAnnotations(cfg *common.Config) (autodiscover.Builder, error) {
config := defaultConfig()
err := cfg.Unpack(&config)

if err != nil {
return nil, fmt.Errorf("unable to unpack log.annotations config due to error: %v", err)
}

return &logAnnotations{config.Key, config.Config}, nil
}

// Create config based on input hints in the bus event
func (l *logAnnotations) CreateConfig(event bus.Event) []*common.Config {
var config []*common.Config

host, _ := event["host"].(string)
if host == "" {
return config
}

var hints common.MapStr
hIface, ok := event["hints"]
if ok {
hints, _ = hIface.(common.MapStr)
}

if builder.IsNoOp(hints, l.Key) == true {
return config
}

//TODO: Add module support

tempCfg := common.MapStr{}
mline := l.getMultiline(hints)
if len(mline) != 0 {
tempCfg.Put(multiline, mline)
}
if ilines := l.getIncludeLines(hints); len(ilines) != 0 {
tempCfg.Put(includeLines, ilines)
}
if elines := l.getExcludeLines(hints); len(elines) != 0 {
tempCfg.Put(excludeLines, elines)
}

// Merge config template with the configs from the annotations
for _, c := range l.Config {
if err := c.Merge(tempCfg); err != nil {
logp.Debug("logs.builder", "config merge failed with error: %v", err)
} else {
logp.Debug("logs.builder", "generated config %v", *c)
config = append(config, c)
}
}

// Apply information in event to the template to generate the final config
config = template.ApplyConfigTemplate(event, config)
return config
}

func (l *logAnnotations) getMultiline(hints common.MapStr) common.MapStr {
return builder.GetHintMapStr(hints, l.Key, multiline)
}

func (l *logAnnotations) getIncludeLines(hints common.MapStr) []string {
return builder.GetHintAsList(hints, l.Key, includeLines)
}

func (l *logAnnotations) getExcludeLines(hints common.MapStr) []string {
return builder.GetHintAsList(hints, l.Key, excludeLines)
}
145 changes: 145 additions & 0 deletions filebeat/autodiscover/builder/logs/logs_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
package logs

import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/bus"
)

func TestGenerateHints(t *testing.T) {
tests := []struct {
event bus.Event
len int
result common.MapStr
}{
// Hints without host should return nothing
{
event: bus.Event{
"hints": common.MapStr{
"metrics": common.MapStr{
"module": "prometheus",
},
},
},
len: 0,
result: common.MapStr{},
},
// Empty event hints should return default config
{
event: bus.Event{
"host": "1.2.3.4",
"kubernetes": common.MapStr{
"container": common.MapStr{
"name": "foobar",
"id": "abc",
},
},
"docker": common.MapStr{
"container": common.MapStr{
"name": "foobar",
"id": "abc",
},
},
},
len: 1,
result: common.MapStr{
"type": "docker",
"containers": map[string]interface{}{
"ids": []interface{}{"abc"},
},
},
},
// Hint with include|exclude_lines must be part of the input config
{
event: bus.Event{
"host": "1.2.3.4",
"kubernetes": common.MapStr{
"container": common.MapStr{
"name": "foobar",
"id": "abc",
},
},
"docker": common.MapStr{
"container": common.MapStr{
"name": "foobar",
"id": "abc",
},
},
"hints": common.MapStr{
"logs": common.MapStr{
"include_lines": "^test, ^test1",
"exclude_lines": "^test2, ^test3",
},
},
},
len: 1,
result: common.MapStr{
"type": "docker",
"containers": map[string]interface{}{
"ids": []interface{}{"abc"},
},
"include_lines": []interface{}{"^test", "^test1"},
"exclude_lines": []interface{}{"^test2", "^test3"},
},
},
// Hint with multiline config must have a multiline in the input config
{
event: bus.Event{
"host": "1.2.3.4",
"kubernetes": common.MapStr{
"container": common.MapStr{
"name": "foobar",
"id": "abc",
},
},
"docker": common.MapStr{
"container": common.MapStr{
"name": "foobar",
"id": "abc",
},
},
"hints": common.MapStr{
"logs": common.MapStr{
"multiline": common.MapStr{
"pattern": "^test",
"negate": "true",
},
},
},
},
len: 1,
result: common.MapStr{
"type": "docker",
"containers": map[string]interface{}{
"ids": []interface{}{"abc"},
},
"multiline": map[string]interface{}{
"pattern": "^test",
"negate": "true",
},
},
},
}

for _, test := range tests {
cfg := defaultConfig()
l := logAnnotations{
Key: cfg.Key,
Config: cfg.Config,
}
cfgs := l.CreateConfig(test.event)
assert.Equal(t, len(cfgs), test.len)

if test.len != 0 {
config := common.MapStr{}
err := cfgs[0].Unpack(&config)
assert.Nil(t, err)

assert.Equal(t, config, test.result)
}

}
}
6 changes: 6 additions & 0 deletions filebeat/autodiscover/include.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package autodiscover

import (
// include all filebeat specific builders
_ "github.com/elastic/beats/filebeat/autodiscover/builder/logs"
)
3 changes: 2 additions & 1 deletion filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/elastic/beats/libbeat/monitoring"
"github.com/elastic/beats/libbeat/outputs/elasticsearch"

fbautodiscover "github.com/elastic/beats/filebeat/autodiscover"
"github.com/elastic/beats/filebeat/channel"
cfg "github.com/elastic/beats/filebeat/config"
"github.com/elastic/beats/filebeat/crawler"
Expand Down Expand Up @@ -295,7 +296,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error {

var adiscover *autodiscover.Autodiscover
if fb.config.Autodiscover != nil {
adapter := NewAutodiscoverAdapter(crawler.InputsFactory, crawler.ModulesFactory)
adapter := fbautodiscover.NewAutodiscoverAdapter(crawler.InputsFactory, crawler.ModulesFactory)
adiscover, err = autodiscover.NewAutodiscover("filebeat", adapter, config.Autodiscover)
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion libbeat/autodiscover/autodiscover.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func NewAutodiscover(name string, adapter Adapter, config *Config) (*Autodiscove
// Init providers
var providers []Provider
for _, providerCfg := range config.Providers {
provider, err := ProviderRegistry.BuildProvider(bus, providerCfg)
provider, err := Registry.BuildProvider(bus, providerCfg)
if err != nil {
return nil, err
}
Expand Down
8 changes: 4 additions & 4 deletions libbeat/autodiscover/autodiscover_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@ func TestNilAutodiscover(t *testing.T) {
func TestAutodiscover(t *testing.T) {
// Register mock autodiscover provider
busChan := make(chan bus.Bus, 1)
ProviderRegistry = NewRegistry()
ProviderRegistry.AddProvider("mock", func(b bus.Bus, c *common.Config) (Provider, error) {
Registry = NewRegistry()
Registry.AddProvider("mock", func(b bus.Bus, c *common.Config) (Provider, error) {
// intercept bus to mock events
busChan <- b

Expand Down Expand Up @@ -205,8 +205,8 @@ func TestAutodiscoverHash(t *testing.T) {
// Register mock autodiscover provider
busChan := make(chan bus.Bus, 1)

ProviderRegistry = NewRegistry()
ProviderRegistry.AddProvider("mock", func(b bus.Bus, c *common.Config) (Provider, error) {
Registry = NewRegistry()
Registry.AddProvider("mock", func(b bus.Bus, c *common.Config) (Provider, error) {
// intercept bus to mock events
busChan <- b

Expand Down
Loading

0 comments on commit 326c68f

Please sign in to comment.