From 37ace56f74d81544a5687d42090e51188c3dc5b9 Mon Sep 17 00:00:00 2001 From: Jaime Soriano Pastor Date: Fri, 29 May 2020 16:37:32 +0200 Subject: [PATCH 1/2] Fix panic on `metricbeat test modules` (#18797) Since metricbeat light modules support processors (#15923), module initialization requires a publisher in the beat so modules can attach their processors. `metricbeat test modules` is not initializing as normal metricbeat commands, and it is not initializing any output or publisher pipeline, so metricbeat panics when trying to initialize modules with the new method. This change adds a dummy publisher for this case, and fixes also a condition that was adding a `nil` module option, causing additional panics. A test that reproduced the issues is also added. (cherry picked from commit 25b8bf17038c4d659299b8e5b1dd1cafa723ba3f) --- CHANGELOG.next.asciidoc | 1 + metricbeat/cmd/test/modules.go | 17 +++++++++++++++++ metricbeat/mb/module/configuration.go | 2 +- metricbeat/tests/system/test_cmd.py | 15 +++++++++++++++ 4 files changed, 34 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 6f01a18bee3..d3cb57323fd 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -184,6 +184,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Fix storage metricset to allow config without region/zone. {issue}17623[17623] {pull}17624[17624] - Fix overflow on Prometheus rates when new buckets are added on the go. {pull}17753[17753] - Fix tags_filter for cloudwatch metricset in aws. {pull}18524[18524] +- Fix panic on `metricbeat test modules` when modules are configured in `metricbeat.modules`. {issue}18789[18789] {pull}18797[18797] - Add missing network.sent_packets_count metric into compute metricset in googlecloud module. {pull}18802[18802] *Packetbeat* diff --git a/metricbeat/cmd/test/modules.go b/metricbeat/cmd/test/modules.go index 93950973a5d..6bf312d17b9 100644 --- a/metricbeat/cmd/test/modules.go +++ b/metricbeat/cmd/test/modules.go @@ -25,6 +25,7 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/cmd/instance" + "github.com/elastic/beats/v7/libbeat/publisher/pipeline" "github.com/elastic/beats/v7/libbeat/testing" "github.com/elastic/beats/v7/metricbeat/beater" ) @@ -49,6 +50,8 @@ func GenTestModulesCmd(name, beatVersion string, create beat.Creator) *cobra.Com os.Exit(1) } + // A publisher is needed for modules that add their own pipelines + b.Beat.Publisher = newPublisher() mb, err := create(&b.Beat, b.Beat.BeatConfig) if err != nil { fmt.Fprintf(os.Stderr, "Error initializing metricbeat: %s\n", err) @@ -78,3 +81,17 @@ func GenTestModulesCmd(name, beatVersion string, create beat.Creator) *cobra.Com }, } } + +type publisher struct { + beat.PipelineConnector +} + +// newPublisher returns a functional publisher that does nothing. +func newPublisher() *publisher { + return &publisher{pipeline.NewNilPipeline()} +} + +// SetACKHandler is a dummy implementation of the ack handler for the test publisher. +func (*publisher) SetACKHandler(beat.PipelineACKHandler) error { + return nil +} diff --git a/metricbeat/mb/module/configuration.go b/metricbeat/mb/module/configuration.go index 8b0701c0ae9..d5049690b48 100644 --- a/metricbeat/mb/module/configuration.go +++ b/metricbeat/mb/module/configuration.go @@ -30,7 +30,7 @@ func ConfiguredModules(modulesData []*common.Config, configModulesData *common.C var modules []*Wrapper for _, moduleCfg := range modulesData { - module, err := NewWrapper(moduleCfg, mb.Registry, nil) + module, err := NewWrapper(moduleCfg, mb.Registry, moduleOptions...) if err != nil { return nil, err } diff --git a/metricbeat/tests/system/test_cmd.py b/metricbeat/tests/system/test_cmd.py index 740beedb97a..ad9a507d08c 100644 --- a/metricbeat/tests/system/test_cmd.py +++ b/metricbeat/tests/system/test_cmd.py @@ -124,6 +124,21 @@ def test_modules_test(self): assert self.log_contains("cpu...OK") assert self.log_contains("memory...OK") + def test_modules_test_with_module_in_main_config(self): + self.render_config_template(reload=False, modules=[{ + "name": "system", + "metricsets": ["cpu", "memory"], + "period": "10s", + }]) + + exit_code = self.run_beat( + logging_args=None, + extra_args=["test", "modules"]) + + assert exit_code == 0 + assert self.log_contains("cpu...OK") + assert self.log_contains("memory...OK") + def test_modules_test_error(self): """ Test test modules command with an error result From 0792e65d46abb48b3861f3e0840f9811b40116de Mon Sep 17 00:00:00 2001 From: Jaime Soriano Pastor Date: Tue, 2 Jun 2020 13:20:54 +0200 Subject: [PATCH 2/2] Backport nil pipeline from #16715 Co-authored-by: Steffen Siering --- libbeat/publisher/pipeline/nilpipeline.go | 83 +++++++++++++++++++++++ 1 file changed, 83 insertions(+) create mode 100644 libbeat/publisher/pipeline/nilpipeline.go diff --git a/libbeat/publisher/pipeline/nilpipeline.go b/libbeat/publisher/pipeline/nilpipeline.go new file mode 100644 index 00000000000..f32785a8d22 --- /dev/null +++ b/libbeat/publisher/pipeline/nilpipeline.go @@ -0,0 +1,83 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package pipeline + +import "github.com/elastic/beats/v7/libbeat/beat" + +type nilPipeline struct{} + +type nilClient struct { + eventer beat.ClientEventer + ackCount func(int) + ackEvents func([]interface{}) + ackLastEvent func(interface{}) +} + +var _nilPipeline = (*nilPipeline)(nil) + +// NewNilPipeline returns a new pipeline that is compatible with +// beats.PipelineConnector. The pipeline will discard all events that have been +// published. Client ACK handlers will still be executed, but the callbacks +// will be executed immediately when the event is published. +func NewNilPipeline() beat.PipelineConnector { return _nilPipeline } + +func (p *nilPipeline) Connect() (beat.Client, error) { + return p.ConnectWith(beat.ClientConfig{}) +} + +func (p *nilPipeline) ConnectWith(cfg beat.ClientConfig) (beat.Client, error) { + return &nilClient{ + eventer: cfg.Events, + ackCount: cfg.ACKCount, + ackEvents: cfg.ACKEvents, + ackLastEvent: cfg.ACKLastEvent, + }, nil +} + +func (c *nilClient) Publish(event beat.Event) { + c.PublishAll([]beat.Event{event}) +} + +func (c *nilClient) PublishAll(events []beat.Event) { + L := len(events) + if L == 0 { + return + } + + if c.ackLastEvent != nil { + c.ackLastEvent(events[L-1].Private) + } + if c.ackEvents != nil { + tmp := make([]interface{}, L) + for i := range events { + tmp[i] = events[i].Private + } + c.ackEvents(tmp) + } + if c.ackCount != nil { + c.ackCount(L) + } +} + +func (c *nilClient) Close() error { + if c.eventer != nil { + c.eventer.Closing() + c.eventer.Closed() + } + return nil +}