From ac9434b1d8dcfcc3f67c645a30ec018a56b24f1b Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Mon, 16 Dec 2019 13:32:43 -0500 Subject: [PATCH] [metricbeat] [auditbeat] Add formatted index option to metricbeat / auditbeat modules (#15100) --- CHANGELOG.next.asciidoc | 1 + metricbeat/beater/metricbeat.go | 6 +- metricbeat/docs/metricbeat-options.asciidoc | 14 ++- metricbeat/mb/module/connector.go | 39 ++++++- metricbeat/mb/module/connector_test.go | 106 ++++++++++++++++++++ metricbeat/mb/module/example_test.go | 2 +- metricbeat/mb/module/factory.go | 10 +- 7 files changed, 167 insertions(+), 11 deletions(-) create mode 100644 metricbeat/mb/module/connector_test.go diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index dff59c93d1f..a788d5f7fb1 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -553,6 +553,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Add billing metricset in aws modules. {pull}14801[14801] {issue}14934[14934] - Add AWS SNS metricset. {pull}14946[14946] - Add overview dashboard for AWS SNS module {pull}14977[14977] +- Add `index` option to all modules to specify a module-specific output index. {pull}15100[15100] *Packetbeat* diff --git a/metricbeat/beater/metricbeat.go b/metricbeat/beater/metricbeat.go index f071d6e7bff..8d27d806666 100644 --- a/metricbeat/beater/metricbeat.go +++ b/metricbeat/beater/metricbeat.go @@ -170,7 +170,7 @@ func newMetricbeat(b *beat.Beat, c *common.Config, options ...Option) (*Metricbe failed := false - connector, err := module.NewConnector(b.Publisher, moduleCfg, nil) + connector, err := module.NewConnector(b.Info, b.Publisher, moduleCfg, nil) if err != nil { errs = append(errs, err) failed = true @@ -201,7 +201,7 @@ func newMetricbeat(b *beat.Beat, c *common.Config, options ...Option) (*Metricbe if config.Autodiscover != nil { var err error - factory := module.NewFactory(metricbeat.moduleOptions...) + factory := module.NewFactory(b.Info, metricbeat.moduleOptions...) adapter := autodiscover.NewFactoryAdapter(factory) metricbeat.autodiscover, err = autodiscover.NewAutodiscover("metricbeat", b.Publisher, adapter, config.Autodiscover) if err != nil { @@ -238,7 +238,7 @@ func (bt *Metricbeat) Run(b *beat.Beat) error { } // Centrally managed modules - factory := module.NewFactory(bt.moduleOptions...) + factory := module.NewFactory(b.Info, bt.moduleOptions...) modules := cfgfile.NewRunnerList(management.DebugK, factory, b.Publisher) reload.Register.MustRegisterList(b.Info.Beat+".modules", modules) wg.Add(1) diff --git a/metricbeat/docs/metricbeat-options.asciidoc b/metricbeat/docs/metricbeat-options.asciidoc index 675d4626226..8c26d32efe0 100644 --- a/metricbeat/docs/metricbeat-options.asciidoc +++ b/metricbeat/docs/metricbeat-options.asciidoc @@ -214,6 +214,18 @@ A list of processors to apply to the data generated by the metricset. See <> for information about specifying processors in your config. +[float] +==== `index` + +If present, this formatted string overrides the index for events from this +module (for elasticsearch outputs), or sets the `raw_index` field of the event's +metadata (for other outputs). This string can only refer to the agent name and +version and the event timestamp; for access to dynamic fields, use +`output.elasticsearch.index` or a processor. + +Example value: `"%{[agent.name]}-myindex-%{+yyyy.MM.dd}"` might +expand to `"metricbeat-myindex-2019.12.13"`. + [float] ==== `keep_null` @@ -289,7 +301,7 @@ as the first segment in the HTTP URI path. [float] ==== `query` -An optional value to pass common query params in YAML. Instead of setting the query params +An optional value to pass common query params in YAML. Instead of setting the query params within hosts values using the syntax `?key=value&key2&value2`, you can set it here like this: [source,yaml] diff --git a/metricbeat/mb/module/connector.go b/metricbeat/mb/module/connector.go index cb8233a65cc..b85aecc9328 100644 --- a/metricbeat/mb/module/connector.go +++ b/metricbeat/mb/module/connector.go @@ -20,7 +20,9 @@ package module import ( "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/common/fmtstr" "github.com/elastic/beats/libbeat/processors" + "github.com/elastic/beats/libbeat/processors/add_formatted_index" ) // Connector configures and establishes a beat.Client for publishing events @@ -36,6 +38,8 @@ type Connector struct { type connectorConfig struct { Processors processors.PluginConfig `config:"processors"` + // ES output index pattern + Index fmtstr.EventFormatString `config:"index"` // KeepNull determines whether published events will keep null values or omit them. KeepNull bool `config:"keep_null"` @@ -43,13 +47,16 @@ type connectorConfig struct { common.EventMetadata `config:",inline"` // Fields and tags to add to events. } -func NewConnector(pipeline beat.Pipeline, c *common.Config, dynFields *common.MapStrPointer) (*Connector, error) { +func NewConnector( + beatInfo beat.Info, pipeline beat.Pipeline, + c *common.Config, dynFields *common.MapStrPointer, +) (*Connector, error) { config := connectorConfig{} if err := c.Unpack(&config); err != nil { return nil, err } - processors, err := processors.New(config.Processors) + processors, err := processorsForConfig(beatInfo, config) if err != nil { return nil, err } @@ -73,3 +80,31 @@ func (c *Connector) Connect() (beat.Client, error) { }, }) } + +// processorsForConfig assembles the Processors for a Connector. +func processorsForConfig( + beatInfo beat.Info, config connectorConfig, +) (*processors.Processors, error) { + procs := processors.NewList(nil) + + // Processor order is important! The index processor, if present, must be + // added before the user processors. + if !config.Index.IsEmpty() { + staticFields := fmtstr.FieldsForBeat(beatInfo.Beat, beatInfo.Version) + timestampFormat, err := + fmtstr.NewTimestampFormatString(&config.Index, staticFields) + if err != nil { + return nil, err + } + indexProcessor := add_formatted_index.New(timestampFormat) + procs.AddProcessor(indexProcessor) + } + + userProcs, err := processors.New(config.Processors) + if err != nil { + return nil, err + } + procs.AddProcessors(*userProcs) + + return procs, nil +} diff --git a/metricbeat/mb/module/connector_test.go b/metricbeat/mb/module/connector_test.go new file mode 100644 index 00000000000..40789374bf8 --- /dev/null +++ b/metricbeat/mb/module/connector_test.go @@ -0,0 +1,106 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package module + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/libbeat/beat" + "github.com/elastic/beats/libbeat/common" +) + +func TestProcessorsForConfig(t *testing.T) { + testCases := map[string]struct { + beatInfo beat.Info + configStr string + event beat.Event + expectedFields map[string]string + }{ + "Simple static index": { + configStr: "index: 'test'", + expectedFields: map[string]string{ + "@metadata.raw_index": "test", + }, + }, + "Index with agent info + timestamp": { + beatInfo: beat.Info{Beat: "TestBeat", Version: "3.9.27"}, + configStr: "index: 'beat-%{[agent.name]}-%{[agent.version]}-%{+yyyy.MM.dd}'", + event: beat.Event{Timestamp: time.Date(1999, time.December, 31, 23, 0, 0, 0, time.UTC)}, + expectedFields: map[string]string{ + "@metadata.raw_index": "beat-TestBeat-3.9.27-1999.12.31", + }, + }, + } + for description, test := range testCases { + if test.event.Fields == nil { + test.event.Fields = common.MapStr{} + } + config, err := connectorConfigFromString(test.configStr) + if err != nil { + t.Errorf("[%s] %v", description, err) + continue + } + processors, err := processorsForConfig(test.beatInfo, config) + if err != nil { + t.Errorf("[%s] %v", description, err) + continue + } + processedEvent, err := processors.Run(&test.event) + // We don't check if err != nil, because we are testing the final outcome + // of running the processors, including when some of them fail. + if processedEvent == nil { + t.Errorf("[%s] Unexpected fatal error running processors: %v\n", + description, err) + } + for key, value := range test.expectedFields { + field, err := processedEvent.GetValue(key) + if err != nil { + t.Errorf("[%s] Couldn't get field %s from event: %v", description, key, err) + continue + } + assert.Equal(t, field, value) + fieldStr, ok := field.(string) + if !ok { + // Note that requiring a string here is just to simplify the test setup, + // not a requirement of the underlying api. + t.Errorf("[%s] Field [%s] should be a string", description, key) + continue + } + if fieldStr != value { + t.Errorf("[%s] Event field [%s]: expected [%s], got [%s]", description, key, value, fieldStr) + } + } + } +} + +// Helper function to convert from YML input string to an unpacked +// connectorConfig +func connectorConfigFromString(s string) (connectorConfig, error) { + config := connectorConfig{} + cfg, err := common.NewConfigFrom(s) + if err != nil { + return config, err + } + if err := cfg.Unpack(&config); err != nil { + return config, err + } + return config, nil +} diff --git a/metricbeat/mb/module/example_test.go b/metricbeat/mb/module/example_test.go index 20d7e192adb..e5fc7e81e83 100644 --- a/metricbeat/mb/module/example_test.go +++ b/metricbeat/mb/module/example_test.go @@ -132,7 +132,7 @@ func ExampleRunner() { return } - connector, err := module.NewConnector(b.Publisher, config, nil) + connector, err := module.NewConnector(b.Info, b.Publisher, config, nil) if err != nil { return } diff --git a/metricbeat/mb/module/factory.go b/metricbeat/mb/module/factory.go index eaaca0b84b3..8661bee3d03 100644 --- a/metricbeat/mb/module/factory.go +++ b/metricbeat/mb/module/factory.go @@ -29,13 +29,15 @@ import ( // Factory creates new Runner instances from configuration objects. // It is used to register and reload modules. type Factory struct { - options []Option + beatInfo beat.Info + options []Option } // NewFactory creates new Reloader instance for the given config -func NewFactory(options ...Option) *Factory { +func NewFactory(beatInfo beat.Info, options ...Option) *Factory { return &Factory{ - options: options, + beatInfo: beatInfo, + options: options, } } @@ -43,7 +45,7 @@ func NewFactory(options ...Option) *Factory { func (r *Factory) Create(p beat.Pipeline, c *common.Config, meta *common.MapStrPointer) (cfgfile.Runner, error) { var errs multierror.Errors - connector, err := NewConnector(p, c, meta) + connector, err := NewConnector(r.beatInfo, p, c, meta) if err != nil { errs = append(errs, err) }