Skip to content

Commit

Permalink
Update metricbeat to use new publisher API (#4699)
Browse files Browse the repository at this point in the history
* update metricbeat to use new publisher API

* update libbeat/publisher/testing to support new API only

* Update metricbeat unit tests

* Update libbeat/publisher/testing unit test

* Update filters->processors in system tests

* Update redis test to change semantics on processor

* review fixes
  • Loading branch information
Steffen Siering authored and andrewkroh committed Jul 19, 2017
1 parent b183655 commit a6f43f8
Show file tree
Hide file tree
Showing 22 changed files with 377 additions and 438 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ https://github.com/elastic/beats/compare/v6.0.0-alpha2...master[Check the HEAD d
This will make the CPU usage percentages from the system cpu metricset consistent
with the system process metricset. The documentation for these metrics already
stated that on multi-core systems the percentages could be greater than 100%. {pull}4544[4544]
- Remove filters setting from metricbeat modules. {pull}4699[4699]

*Packetbeat*

Expand Down Expand Up @@ -105,6 +106,7 @@ https://github.com/elastic/beats/compare/v6.0.0-alpha2...master[Check the HEAD d
- Add the ability to configure kernel's audit failure mode. {pull}4516[4516]
- Add experimental Aerospike module. {pull}4560[4560]
- Vsphere module: collect custom fields from virtual machines. {issue}4464[4464]
- Add `processors` setting to metricbeat modules. {pull}4699[4699]

*Packetbeat*

Expand Down
59 changes: 15 additions & 44 deletions libbeat/publisher/testing/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,51 +2,43 @@ package testing

// ChanClient implements Client interface, forwarding published events to some
import (
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/publisher/bc/publisher"
"github.com/elastic/beats/libbeat/publisher/beat"
)

type TestPublisher struct {
client publisher.Client
client beat.Client
}

// given channel only.
type ChanClient struct {
done chan struct{}
Channel chan PublishMessage

recvBuf []common.MapStr
}

type PublishMessage struct {
Context publisher.Context
Events []common.MapStr
Channel chan beat.Event
}

func PublisherWithClient(client publisher.Client) publisher.Publisher {
func PublisherWithClient(client beat.Client) publisher.Publisher {
return &TestPublisher{client}
}

func (pub *TestPublisher) Connect() publisher.Client {
return pub.client
panic("Not supported")
}

func (pub *TestPublisher) ConnectX(_ beat.ClientConfig) (beat.Client, error) {
panic("Not supported")
return pub.client, nil
}

func (pub *TestPublisher) SetACKHandler(_ beat.PipelineACKHandler) error {
panic("Not supported")
}

func NewChanClient(bufSize int) *ChanClient {
return NewChanClientWith(make(chan PublishMessage, bufSize))
return NewChanClientWith(make(chan beat.Event, bufSize))
}

func NewChanClientWith(ch chan PublishMessage) *ChanClient {
func NewChanClientWith(ch chan beat.Event) *ChanClient {
if ch == nil {
ch = make(chan PublishMessage, 1)
ch = make(chan beat.Event, 1)
}
c := &ChanClient{
done: make(chan struct{}),
Expand All @@ -62,40 +54,19 @@ func (c *ChanClient) Close() error {

// PublishEvent will publish the event on the channel. Options will be ignored.
// Always returns true.
func (c *ChanClient) PublishEvent(event common.MapStr, opts ...publisher.ClientOption) bool {
return c.PublishEvents([]common.MapStr{event}, opts...)
}

// PublishEvents publishes all event on the configured channel. Options will be ignored.
// Always returns true.
func (c *ChanClient) PublishEvents(events []common.MapStr, opts ...publisher.ClientOption) bool {
_, ctx := publisher.MakeContext(opts)
msg := PublishMessage{ctx, events}
func (c *ChanClient) Publish(event beat.Event) {
select {
case <-c.done:
return false
case c.Channel <- msg:
return true
case c.Channel <- event:
}
}

func (c *ChanClient) ReceiveEvent() common.MapStr {
if len(c.recvBuf) > 0 {
evt := c.recvBuf[0]
c.recvBuf = c.recvBuf[1:]
return evt
func (c *ChanClient) PublishAll(event []beat.Event) {
for _, e := range event {
c.Publish(e)
}

msg := <-c.Channel
c.recvBuf = msg.Events
return c.ReceiveEvent()
}

func (c *ChanClient) ReceiveEvents() []common.MapStr {
if len(c.recvBuf) > 0 {
return c.recvBuf
}

msg := <-c.Channel
return msg.Events
func (c *ChanClient) ReceiveEvent() beat.Event {
return <-c.Channel
}
16 changes: 10 additions & 6 deletions libbeat/publisher/testing/testing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,19 @@ import (
"testing"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/publisher/beat"
"github.com/stretchr/testify/assert"
)

var cnt = 0

func testEvent() common.MapStr {
event := common.MapStr{}
event["message"] = "test"
event["idx"] = cnt
func testEvent() beat.Event {
event := beat.Event{
Fields: common.MapStr{
"message": "test",
"idx": cnt,
},
}
cnt++
return event
}
Expand All @@ -21,7 +25,7 @@ func testEvent() common.MapStr {
func TestChanClientPublishEvent(t *testing.T) {
cc := NewChanClient(1)
e1 := testEvent()
cc.PublishEvent(e1)
cc.Publish(e1)
assert.Equal(t, e1, cc.ReceiveEvent())
}

Expand All @@ -30,7 +34,7 @@ func TestChanClientPublishEvents(t *testing.T) {
cc := NewChanClient(1)

e1, e2 := testEvent(), testEvent()
cc.PublishEvents([]common.MapStr{e1, e2})
go cc.PublishAll([]beat.Event{e1, e2})
assert.Equal(t, e1, cc.ReceiveEvent())
assert.Equal(t, e2, cc.ReceiveEvent())
}
64 changes: 52 additions & 12 deletions metricbeat/beater/metricbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ import (
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/publisher/bc/publisher"
"github.com/elastic/beats/metricbeat/mb"
"github.com/elastic/beats/metricbeat/mb/module"
"github.com/joeshaw/multierror"

"github.com/elastic/beats/libbeat/cfgfile"
"github.com/pkg/errors"
Expand All @@ -19,12 +19,16 @@ import (

// Metricbeat implements the Beater interface for metricbeat.
type Metricbeat struct {
done chan struct{} // Channel used to initiate shutdown.
modules []*module.Wrapper // Active list of modules.
client publisher.Client // Publisher client.
done chan struct{} // Channel used to initiate shutdown.
modules []staticModule // Active list of modules.
config Config
}

type staticModule struct {
connector *module.Connector
module *module.Wrapper
}

// New creates and returns a new Metricbeat instance.
func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) {
// List all registered modules and metricsets.
Expand All @@ -35,14 +39,45 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) {
return nil, errors.Wrap(err, "error reading configuration file")
}

modules, err := module.NewWrappers(config.MaxStartDelay, config.Modules, mb.Registry)
if err != nil {
// Empty config is fine if dynamic config is enabled
if !config.ConfigModules.Enabled() {
return nil, err
} else if err != mb.ErrEmptyConfig && err != mb.ErrAllModulesDisabled {
return nil, err
dynamicCfgEnabled := config.ConfigModules.Enabled()
if !dynamicCfgEnabled && len(config.Modules) == 0 {
return nil, mb.ErrEmptyConfig
}

var errs multierror.Errors
var modules []staticModule
for _, moduleCfg := range config.Modules {
if !moduleCfg.Enabled() {
continue
}

failed := false
connector, err := module.NewConnector(b.Publisher, moduleCfg)
if err != nil {
errs = append(errs, err)
failed = true
}

module, err := module.NewWrapper(config.MaxStartDelay, moduleCfg, mb.Registry)
if err != nil {
errs = append(errs, err)
failed = true
}

if failed {
continue
}
modules = append(modules, staticModule{
connector: connector,
module: module,
})
}

if err := errs.Err(); err != nil {
return nil, err
}
if len(modules) == 0 && !dynamicCfgEnabled {
return nil, mb.ErrAllModulesDisabled
}

mb := &Metricbeat{
Expand All @@ -62,7 +97,12 @@ func (bt *Metricbeat) Run(b *beat.Beat) error {
var wg sync.WaitGroup

for _, m := range bt.modules {
r := module.NewRunner(b.Publisher.Connect, m)
client, err := m.connector.Connect()
if err != nil {
return err
}

r := module.NewRunner(client, m.module)
r.Start()
wg.Add(1)
go func() {
Expand Down
Loading

0 comments on commit a6f43f8

Please sign in to comment.