Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cherry-pick #18797 to 7.7: Fix panic on metricbeat test modules #18853

Merged
merged 3 commits into from
Jun 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Fix storage metricset to allow config without region/zone. {issue}17623[17623] {pull}17624[17624]
- Fix overflow on Prometheus rates when new buckets are added on the go. {pull}17753[17753]
- Fix tags_filter for cloudwatch metricset in aws. {pull}18524[18524]
- Fix panic on `metricbeat test modules` when modules are configured in `metricbeat.modules`. {issue}18789[18789] {pull}18797[18797]
- Add missing network.sent_packets_count metric into compute metricset in googlecloud module. {pull}18802[18802]

*Packetbeat*
Expand Down
83 changes: 83 additions & 0 deletions libbeat/publisher/pipeline/nilpipeline.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package pipeline

import "github.com/elastic/beats/v7/libbeat/beat"

type nilPipeline struct{}

type nilClient struct {
eventer beat.ClientEventer
ackCount func(int)
ackEvents func([]interface{})
ackLastEvent func(interface{})
}

var _nilPipeline = (*nilPipeline)(nil)

// NewNilPipeline returns a new pipeline that is compatible with
// beats.PipelineConnector. The pipeline will discard all events that have been
// published. Client ACK handlers will still be executed, but the callbacks
// will be executed immediately when the event is published.
func NewNilPipeline() beat.PipelineConnector { return _nilPipeline }

func (p *nilPipeline) Connect() (beat.Client, error) {
return p.ConnectWith(beat.ClientConfig{})
}

func (p *nilPipeline) ConnectWith(cfg beat.ClientConfig) (beat.Client, error) {
return &nilClient{
eventer: cfg.Events,
ackCount: cfg.ACKCount,
ackEvents: cfg.ACKEvents,
ackLastEvent: cfg.ACKLastEvent,
}, nil
}

func (c *nilClient) Publish(event beat.Event) {
c.PublishAll([]beat.Event{event})
}

func (c *nilClient) PublishAll(events []beat.Event) {
L := len(events)
if L == 0 {
return
}

if c.ackLastEvent != nil {
c.ackLastEvent(events[L-1].Private)
}
if c.ackEvents != nil {
tmp := make([]interface{}, L)
for i := range events {
tmp[i] = events[i].Private
}
c.ackEvents(tmp)
}
if c.ackCount != nil {
c.ackCount(L)
}
}

func (c *nilClient) Close() error {
if c.eventer != nil {
c.eventer.Closing()
c.eventer.Closed()
}
return nil
}
17 changes: 17 additions & 0 deletions metricbeat/cmd/test/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/cmd/instance"
"github.com/elastic/beats/v7/libbeat/publisher/pipeline"
"github.com/elastic/beats/v7/libbeat/testing"
"github.com/elastic/beats/v7/metricbeat/beater"
)
Expand All @@ -49,6 +50,8 @@ func GenTestModulesCmd(name, beatVersion string, create beat.Creator) *cobra.Com
os.Exit(1)
}

// A publisher is needed for modules that add their own pipelines
b.Beat.Publisher = newPublisher()
mb, err := create(&b.Beat, b.Beat.BeatConfig)
if err != nil {
fmt.Fprintf(os.Stderr, "Error initializing metricbeat: %s\n", err)
Expand Down Expand Up @@ -78,3 +81,17 @@ func GenTestModulesCmd(name, beatVersion string, create beat.Creator) *cobra.Com
},
}
}

type publisher struct {
beat.PipelineConnector
}

// newPublisher returns a functional publisher that does nothing.
func newPublisher() *publisher {
return &publisher{pipeline.NewNilPipeline()}
}

// SetACKHandler is a dummy implementation of the ack handler for the test publisher.
func (*publisher) SetACKHandler(beat.PipelineACKHandler) error {
return nil
}
2 changes: 1 addition & 1 deletion metricbeat/mb/module/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func ConfiguredModules(modulesData []*common.Config, configModulesData *common.C
var modules []*Wrapper

for _, moduleCfg := range modulesData {
module, err := NewWrapper(moduleCfg, mb.Registry, nil)
module, err := NewWrapper(moduleCfg, mb.Registry, moduleOptions...)
if err != nil {
return nil, err
}
Expand Down
15 changes: 15 additions & 0 deletions metricbeat/tests/system/test_cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,21 @@ def test_modules_test(self):
assert self.log_contains("cpu...OK")
assert self.log_contains("memory...OK")

def test_modules_test_with_module_in_main_config(self):
self.render_config_template(reload=False, modules=[{
"name": "system",
"metricsets": ["cpu", "memory"],
"period": "10s",
}])

exit_code = self.run_beat(
logging_args=None,
extra_args=["test", "modules"])

assert exit_code == 0
assert self.log_contains("cpu...OK")
assert self.log_contains("memory...OK")

def test_modules_test_error(self):
"""
Test test modules command with an error result
Expand Down