diff --git a/filebeat/channel/connector.go b/filebeat/channel/connector.go index ebd5983a3f7..d2a20dd54a2 100644 --- a/filebeat/channel/connector.go +++ b/filebeat/channel/connector.go @@ -18,12 +18,11 @@ package channel import ( - "fmt" - "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/common/fmtstr" "github.com/elastic/beats/libbeat/processors" + "github.com/elastic/beats/libbeat/processors/add_formatted_index" ) // ConnectorFunc is an adapter for using ordinary functions as Connector. @@ -34,14 +33,6 @@ type pipelineConnector struct { pipeline beat.Pipeline } -// addFormattedIndex is a Processor to set an event's "raw_index" metadata field -// with a given TimestampFormatString. The elasticsearch output interprets -// that field as specifying the (raw string) index the event should be sent to; -// in other outputs it is just included in the metadata. -type addFormattedIndex struct { - formatString *fmtstr.TimestampFormatString -} - // Connect passes the cfg and the zero value of beat.ClientConfig to the underlying function. func (fn ConnectorFunc) Connect(cfg *common.Config) (Outleter, error) { return fn(cfg, beat.ClientConfig{}) @@ -132,13 +123,13 @@ func processorsForConfig( if err != nil { return nil, err } - indexProcessor := &addFormattedIndex{timestampFormat} - procs.List = append(procs.List, indexProcessor) + indexProcessor := add_formatted_index.New(timestampFormat) + procs.AddProcessor(indexProcessor) } // 2. ClientConfig processors if lst := clientCfg.Processing.Processor; lst != nil { - procs.List = append(procs.List, lst) + procs.AddProcessor(lst) } // 3. User processors @@ -146,34 +137,7 @@ func processorsForConfig( if err != nil { return nil, err } - // Subtlety: it is important here that we append the individual elements of - // userProcessors, rather than userProcessors itself, even though - // userProcessors implements the processors.Processor interface. This is - // because the contents of what we return are later pulled out into a - // processing.group rather than a processors.Processors, and the two have - // different error semantics: processors.Processors aborts processing on - // any error, whereas processing.group only aborts on fatal errors. The - // latter is the most common behavior, and the one we are preserving here for - // backwards compatibility. - // We are unhappy about this and have plans to fix this inconsistency at a - // higher level, but for now we need to respect the existing semantics. - procs.List = append(procs.List, userProcessors.List...) - return procs, nil -} + procs.AddProcessors(*userProcessors) -func (p *addFormattedIndex) Run(event *beat.Event) (*beat.Event, error) { - index, err := p.formatString.Run(event.Timestamp) - if err != nil { - return nil, err - } - - if event.Meta == nil { - event.Meta = common.MapStr{} - } - event.Meta["raw_index"] = index - return event, nil -} - -func (p *addFormattedIndex) String() string { - return fmt.Sprintf("add_index_pattern=%v", p.formatString) + return procs, nil } diff --git a/journalbeat/beater/journalbeat.go b/journalbeat/beater/journalbeat.go index aefe6f8abe1..7a54163f832 100644 --- a/journalbeat/beater/journalbeat.go +++ b/journalbeat/beater/journalbeat.go @@ -67,7 +67,7 @@ func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) { var inputs []*input.Input for _, c := range config.Inputs { - i, err := input.New(c, b.Publisher, done, cp.States()) + i, err := input.New(c, b, done, cp.States()) if err != nil { return nil, err } diff --git a/journalbeat/docs/config-options.asciidoc b/journalbeat/docs/config-options.asciidoc index a52f741c0c8..37eac9153d9 100644 --- a/journalbeat/docs/config-options.asciidoc +++ b/journalbeat/docs/config-options.asciidoc @@ -215,3 +215,16 @@ available: `CONTAINER_NAME`:: `container.name` `CONTAINER_PARTIAL_MESSAGE`:: `container.partial` `CONTAINER_TAG`:: `container.image.tag` + +[float] +[id="index"] +==== `index` + +If present, this formatted string overrides the index for events from this input +(for elasticsearch outputs), or sets the `raw_index` field of the event's +metadata (for other outputs). This string can only refer to the agent name and +version and the event timestamp; for access to dynamic fields, use +`output.elasticsearch.index` or a processor. + +Example value: `"%{[agent.name]}-myindex-%{+yyyy.MM.dd}"` might +expand to `"journalbeat-myindex-2019.12.13"`. diff --git a/journalbeat/input/config.go b/journalbeat/input/config.go index 5ed5f49101f..3916f9ad515 100644 --- a/journalbeat/input/config.go +++ b/journalbeat/input/config.go @@ -22,6 +22,7 @@ import ( "github.com/elastic/beats/journalbeat/config" "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/common/fmtstr" "github.com/elastic/beats/libbeat/processors" ) @@ -47,6 +48,8 @@ type Config struct { common.EventMetadata `config:",inline"` // Processors to run on events. Processors processors.PluginConfig `config:"processors"` + // ES output index pattern + Index fmtstr.EventFormatString `config:"index"` } var ( diff --git a/journalbeat/input/input.go b/journalbeat/input/input.go index d45c69e0812..b3a74dd46b8 100644 --- a/journalbeat/input/input.go +++ b/journalbeat/input/input.go @@ -21,6 +21,10 @@ import ( "fmt" "sync" + "github.com/elastic/beats/libbeat/processors/add_formatted_index" + + "github.com/elastic/beats/libbeat/common/fmtstr" + "github.com/gofrs/uuid" "github.com/elastic/beats/journalbeat/checkpoint" @@ -48,7 +52,7 @@ type Input struct { // New returns a new Inout func New( c *common.Config, - pipeline beat.Pipeline, + b *beat.Beat, done chan struct{}, states map[string]checkpoint.JournalState, ) (*Input, error) { @@ -102,7 +106,7 @@ func New( readers = append(readers, r) } - processors, err := processors.New(config.Processors) + inputProcessors, err := processorsForInput(b.Info, config) if err != nil { return nil, err } @@ -113,12 +117,12 @@ func New( readers: readers, done: done, config: config, - pipeline: pipeline, + pipeline: b.Publisher, states: states, id: id, logger: logger, eventMeta: config.EventMetadata, - processors: processors, + processors: inputProcessors, }, nil } @@ -203,3 +207,29 @@ func (i *Input) Stop() { func (i *Input) Wait() { i.Stop() } + +func processorsForInput(beatInfo beat.Info, config Config) (*processors.Processors, error) { + procs := processors.NewList(nil) + + // Processor ordering is important: + // 1. Index configuration + if !config.Index.IsEmpty() { + staticFields := fmtstr.FieldsForBeat(beatInfo.Beat, beatInfo.Version) + timestampFormat, err := + fmtstr.NewTimestampFormatString(&config.Index, staticFields) + if err != nil { + return nil, err + } + indexProcessor := add_formatted_index.New(timestampFormat) + procs.AddProcessor(indexProcessor) + } + + // 2. User processors + userProcessors, err := processors.New(config.Processors) + if err != nil { + return nil, err + } + procs.AddProcessors(*userProcessors) + + return procs, nil +} diff --git a/journalbeat/input/input_test.go b/journalbeat/input/input_test.go new file mode 100644 index 00000000000..3cbc416133e --- /dev/null +++ b/journalbeat/input/input_test.go @@ -0,0 +1,164 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package input + +import ( + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/libbeat/beat" + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/processors" + _ "github.com/elastic/beats/libbeat/processors/actions" +) + +func TestProcessorsForInput(t *testing.T) { + testCases := map[string]struct { + beatInfo beat.Info + configStr string + event beat.Event + expectedFields map[string]string + }{ + "Simple static index": { + configStr: "index: 'test'", + expectedFields: map[string]string{ + "@metadata.raw_index": "test", + }, + }, + "Index with agent info + timestamp": { + beatInfo: beat.Info{Beat: "TestBeat", Version: "3.9.27"}, + configStr: "index: 'beat-%{[agent.name]}-%{[agent.version]}-%{+yyyy.MM.dd}'", + event: beat.Event{Timestamp: time.Date(1999, time.December, 31, 23, 0, 0, 0, time.UTC)}, + expectedFields: map[string]string{ + "@metadata.raw_index": "beat-TestBeat-3.9.27-1999.12.31", + }, + }, + "Set field in input config": { + configStr: `processors: [add_fields: {fields: {testField: inputConfig}}]`, + expectedFields: map[string]string{ + "fields.testField": "inputConfig", + }, + }, + } + for description, test := range testCases { + if test.event.Fields == nil { + test.event.Fields = common.MapStr{} + } + config, err := inputConfigFromString(test.configStr) + if err != nil { + t.Errorf("[%s] %v", description, err) + continue + } + processors, err := processorsForInput(test.beatInfo, config) + if err != nil { + t.Errorf("[%s] %v", description, err) + continue + } + processedEvent, err := processors.Run(&test.event) + // We don't check if err != nil, because we are testing the final outcome + // of running the processors, including when some of them fail. + if processedEvent == nil { + t.Errorf("[%s] Unexpected fatal error running processors: %v\n", + description, err) + } + for key, value := range test.expectedFields { + field, err := processedEvent.GetValue(key) + if err != nil { + t.Errorf("[%s] Couldn't get field %s from event: %v", description, key, err) + continue + } + assert.Equal(t, field, value) + fieldStr, ok := field.(string) + if !ok { + // Note that requiring a string here is just to simplify the test setup, + // not a requirement of the underlying api. + t.Errorf("[%s] Field [%s] should be a string", description, key) + continue + } + if fieldStr != value { + t.Errorf("[%s] Event field [%s]: expected [%s], got [%s]", description, key, value, fieldStr) + } + } + } +} + +func TestProcessorsForInputIsFlat(t *testing.T) { + // This test is regrettable, and exists because of inconsistencies in + // processor handling between processors.Processors and processing.group + // (which implements beat.ProcessorList) -- see processorsForConfig for + // details. The upshot is that, for now, if the input configuration specifies + // processors, they must be returned as direct children of the resulting + // processors.Processors (rather than being collected in additional tree + // structure). + // This test should be removed once we have a more consistent mechanism for + // collecting and running processors. + configStr := `processors: +- add_fields: {fields: {testField: value}} +- add_fields: {fields: {testField2: stuff}}` + config, err := inputConfigFromString(configStr) + if err != nil { + t.Fatal(err) + } + processors, err := processorsForInput( + beat.Info{}, config) + if err != nil { + t.Fatal(err) + } + assert.Equal(t, 2, len(processors.List)) +} + +// setRawIndex is a bare-bones processor to set the raw_index field to a +// constant string in the event metadata. It is used to test order of operations +// for processorsForConfig. +type setRawIndex struct { + indexStr string +} + +func (p *setRawIndex) Run(event *beat.Event) (*beat.Event, error) { + if event.Meta == nil { + event.Meta = common.MapStr{} + } + event.Meta["raw_index"] = p.indexStr + return event, nil +} + +func (p *setRawIndex) String() string { + return fmt.Sprintf("set_raw_index=%v", p.indexStr) +} + +// Helper function to convert from YML input string to an unpacked +// Config +func inputConfigFromString(s string) (Config, error) { + config := Config{} + cfg, err := common.NewConfigFrom(s) + if err != nil { + return config, err + } + err = cfg.Unpack(&config) + return config, err +} + +// makeProcessors wraps one or more bare Processor objects in Processors. +func makeProcessors(procs ...processors.Processor) *processors.Processors { + procList := processors.NewList(nil) + procList.List = procs + return procList +} diff --git a/libbeat/processors/add_formatted_index/add_formatted_index.go b/libbeat/processors/add_formatted_index/add_formatted_index.go new file mode 100644 index 00000000000..ed947be7349 --- /dev/null +++ b/libbeat/processors/add_formatted_index/add_formatted_index.go @@ -0,0 +1,57 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package add_formatted_index + +import ( + "fmt" + + "github.com/elastic/beats/libbeat/beat" + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/common/fmtstr" +) + +// AddFormattedIndex is a Processor to set an event's "raw_index" metadata field +// with a given TimestampFormatString. The elasticsearch output interprets +// that field as specifying the (raw string) index the event should be sent to; +// in other outputs it is just included in the metadata. +type AddFormattedIndex struct { + formatString *fmtstr.TimestampFormatString +} + +// New returns a new AddFormattedIndex processor. +func New(formatString *fmtstr.TimestampFormatString) *AddFormattedIndex { + return &AddFormattedIndex{formatString} +} + +// Run runs the processor. +func (p *AddFormattedIndex) Run(event *beat.Event) (*beat.Event, error) { + index, err := p.formatString.Run(event.Timestamp) + if err != nil { + return nil, err + } + + if event.Meta == nil { + event.Meta = common.MapStr{} + } + event.Meta["raw_index"] = index + return event, nil +} + +func (p *AddFormattedIndex) String() string { + return fmt.Sprintf("add_index_pattern=%v", p.formatString) +} diff --git a/libbeat/processors/processor.go b/libbeat/processors/processor.go index 56900a70d9d..5eaa6dd2fb4 100644 --- a/libbeat/processors/processor.go +++ b/libbeat/processors/processor.go @@ -60,7 +60,7 @@ func New(config PluginConfig) (*Processors, error) { if err != nil { return nil, errors.Wrap(err, "failed to make if/then/else processor") } - procs.add(p) + procs.AddProcessor(p) continue } @@ -94,7 +94,7 @@ func New(config PluginConfig) (*Processors, error) { return nil, err } - procs.add(plugin) + procs.AddProcessor(plugin) } if len(procs.List) > 0 { @@ -103,10 +103,27 @@ func New(config PluginConfig) (*Processors, error) { return procs, nil } -func (procs *Processors) add(p Processor) { +// AddProcessor adds a single Processor to Processors +func (procs *Processors) AddProcessor(p Processor) { procs.List = append(procs.List, p) } +// AddProcessors adds more Processors to Processors +func (procs *Processors) AddProcessors(p Processors) { + // Subtlety: it is important here that we append the individual elements of + // p, rather than p itself, even though + // p implements the processors.Processor interface. This is + // because the contents of what we return are later pulled out into a + // processing.group rather than a processors.Processors, and the two have + // different error semantics: processors.Processors aborts processing on + // any error, whereas processing.group only aborts on fatal errors. The + // latter is the most common behavior, and the one we are preserving here for + // backwards compatibility. + // We are unhappy about this and have plans to fix this inconsistency at a + // higher level, but for now we need to respect the existing semantics. + procs.List = append(procs.List, p.List...) +} + // RunBC (run backwards-compatible) applies the processors, by providing the // old interface based on common.MapStr. // The event us temporarily converted to beat.Event. By this 'conversion' the