diff --git a/libbeat/common/mapstr.go b/libbeat/common/mapstr.go index c38977d10f23..3bf851266e7d 100644 --- a/libbeat/common/mapstr.go +++ b/libbeat/common/mapstr.go @@ -11,9 +11,8 @@ import ( // Event metadata constants. These keys are used within libbeat to identify // metadata stored in an event. const ( - EventMetadataKey = "_event_metadata" - FieldsKey = "fields" - TagsKey = "tags" + FieldsKey = "fields" + TagsKey = "tags" ) var ( diff --git a/libbeat/publisher/pipeline/pipeline.go b/libbeat/publisher/pipeline/pipeline.go index 1e80a0586bfd..8af8ee3bdf7d 100644 --- a/libbeat/publisher/pipeline/pipeline.go +++ b/libbeat/publisher/pipeline/pipeline.go @@ -60,10 +60,13 @@ type pipelineProcessors struct { // The pipeline its processor settings for // constructing the clients complete processor // pipeline on connect. - beatMetaProcessor beat.Processor - eventMetaProcessor beat.Processor - processors beat.Processor - disabled bool // disabled is set if outputs have been disabled via CLI + beatsMeta common.MapStr + fields common.MapStr + tags []string + + processors beat.Processor + + disabled bool // disabled is set if outputs have been disabled via CLI } // Settings is used to pass additional settings to a newly created pipeline instance. @@ -132,39 +135,17 @@ func New( out outputs.Group, settings Settings, ) (*Pipeline, error) { - annotations := settings.Annotations var err error - var beatMeta beat.Processor - if meta := annotations.Beat; meta != nil { - beatMeta = beatAnnotateProcessor(meta) - } - - var eventMeta beat.Processor - if em := annotations.Event; len(em.Fields) > 0 || len(em.Tags) > 0 { - eventMeta = eventAnnotateProcessor(em) - } - - var prog beat.Processor - if ps := settings.Processors; ps != nil && len(ps.List) > 0 { - tmp := &program{title: "global"} - for _, p := range ps.List { - tmp.add(p) - } - prog = tmp - } - log := defaultLogger + annotations := settings.Annotations + processors := settings.Processors + disabledOutput := settings.Disabled p := &Pipeline{ logger: log, waitCloseMode: settings.WaitCloseMode, waitCloseTimeout: settings.WaitClose, - processors: pipelineProcessors{ - beatMetaProcessor: beatMeta, - eventMetaProcessor: eventMeta, - processors: prog, - disabled: settings.Disabled, - }, + processors: makePipelineProcessors(annotations, processors, disabledOutput), } p.ackBuilder = &pipelineEmptyACK{p} p.ackActive = atomic.MakeBool(true) @@ -382,3 +363,38 @@ func (e *waitCloser) dec(n int) { func (e *waitCloser) wait() { e.events.Wait() } + +func makePipelineProcessors( + annotations Annotations, + processors *processors.Processors, + disabled bool, +) pipelineProcessors { + p := pipelineProcessors{ + disabled: disabled, + } + + hasProcessors := processors != nil && len(processors.List) > 0 + if hasProcessors { + tmp := &program{title: "global"} + for _, p := range processors.List { + tmp.add(p) + } + p.processors = tmp + } + + if meta := annotations.Beat; meta != nil { + p.beatsMeta = common.MapStr{"beat": meta} + } + + if em := annotations.Event; len(em.Fields) > 0 { + fields := common.MapStr{} + common.MergeFields(fields, em.Fields.Clone(), em.FieldsUnderRoot) + p.fields = fields + } + + if t := annotations.Event.Tags; len(t) > 0 { + p.tags = t + } + + return p +} diff --git a/libbeat/publisher/pipeline/processor.go b/libbeat/publisher/pipeline/processor.go index 597af454011c..209150f58219 100644 --- a/libbeat/publisher/pipeline/processor.go +++ b/libbeat/publisher/pipeline/processor.go @@ -29,73 +29,76 @@ type processorFn struct { // // Pipeline (C=client, P=pipeline) // -// 1. (P) extract EventMetadataKey fields + tags (to be removed in favor of 4) -// 2. (P) generalize/normalize event -// 3. (P) add beats metadata (name, hostname, version) -// 4. (C) add Meta from client Config to event.Meta -// 5. (C) add Fields from client config to event.Fields -// 6. (P) add pipeline fields + tags -// 7. (C) add client fields + tags -// 8. (P/C) apply EventMetadataKey fields + tags (to be removed in favor of 4) -// 9. (C) client processors list -// 10. (P) pipeline processors list -// 11. (P) (if publish/debug enabled) log event -// 12. (P) (if output disabled) dropEvent +// 1. (P) generalize/normalize event +// 2. (C) add Meta from client Config to event.Meta +// 3. (C) add Fields from client config to event.Fields +// 4. (P) add pipeline fields + tags +// 5. (C) add client fields + tags +// 6. (C) client processors list +// 7. (P) add beats metadata +// 8. (P) pipeline processors list +// 9. (P) (if publish/debug enabled) log event +// 10. (P) (if output disabled) dropEvent func (p *Pipeline) newProcessorPipeline( config beat.ClientConfig, ) beat.Processor { - processors := &program{title: "processPipeline"} + var ( + // pipeline processors + processors = &program{title: "processPipeline"} - global := p.processors + // client fields and metadata + clientMeta = config.Meta + localProcessors = makeClientProcessors(config) - // setup 1: extract EventMetadataKey fields + tags - processors.add(preEventUserAnnotateProcessor) + // pipeline global + global = p.processors + ) - // setup 2 and 3: generalize/normalize output (P) + needsCopy := localProcessors != nil || global.processors != nil + + // setup 1: generalize/normalize output (P) processors.add(generalizeProcessor) - processors.add(global.beatMetaProcessor) - // setup 4: add Meta from client config - if m := config.Meta; len(m) > 0 { - processors.add(clientEventMeta(m)) + // setup 2: add Meta from client config (C) + if m := clientMeta; len(m) > 0 { + processors.add(clientEventMeta(m, needsCopy)) } - // setup 5: add Fields from client config - if m := config.Fields; len(m) > 0 { - processors.add(clientEventFields(m)) + // setup 4, 5: pipeline tags + client tags + var tags []string + tags = append(tags, global.tags...) + tags = append(tags, config.EventMetadata.Tags...) + if len(tags) > 0 { + processors.add(makeAddTagsProcessor("tags", tags)) } - // setup 6: add event fields + tags (P) - processors.add(global.eventMetaProcessor) - - // setup 7: add fields + tags (C) - if em := config.EventMetadata; len(em.Fields) > 0 || len(em.Tags) > 0 { - processors.add(eventAnnotateProcessor(em)) + // setup 3, 4, 5: client config fields + pipeline fields + client fields + fields := config.Fields.Clone() + fields.DeepUpdate(global.fields) + if em := config.EventMetadata; len(em.Fields) > 0 { + common.MergeFields(fields, em.Fields.Clone(), em.FieldsUnderRoot) + } + if len(fields) > 0 { + processors.add(makeAddFieldsProcessor("fields", fields, needsCopy)) } - // setup 8: apply EventMetadata fields + tags - processors.add(eventUserAnnotateProcessor) - - // setup 9: client processors (C) - if procs := config.Processor; procs != nil { - if lst := procs.All(); len(lst) > 0 { + // setup 5: client processor list + processors.add(localProcessors) - processors.add(&program{ - title: "client", - list: lst, - }) - } + // setup 6: add beats metadata + if meta := global.beatsMeta; len(meta) > 0 { + processors.add(makeAddFieldsProcessor("beatsMeta", meta, needsCopy)) } - // setup 10: pipeline processors (P) + // setup 7: pipeline processors list processors.add(global.processors) - // setup 11: debug print final event (P) + // setup 9: debug print final event (P) if logp.IsDebug("publish") { processors.add(debugPrintProcessor()) } - // setup 12: drop all events if outputs are disabled + // setup 10: drop all events if outputs are disabled (P) if global.disabled { processors.add(dropDisabledProcessor) } @@ -205,67 +208,41 @@ func eventAnnotateProcessor(eventMeta common.EventMetadata) *processorFn { }) } -func clientEventMeta(meta common.MapStr) *processorFn { - return newAnnotateProcessor("@metadata", func(event *beat.Event) { - if event.Meta == nil { - event.Meta = meta.Clone() - } else { - event.Meta = event.Meta.Clone() - event.Meta.DeepUpdate(meta.Clone()) - } - }) +func clientEventMeta(meta common.MapStr, needsCopy bool) *processorFn { + fn := func(event *beat.Event) { addMeta(event, meta) } + if needsCopy { + fn = func(event *beat.Event) { addMeta(event, meta.Clone()) } + } + return newAnnotateProcessor("@metadata", fn) } -func clientEventFields(fields common.MapStr) *processorFn { - return newAnnotateProcessor("globalFields", func(event *beat.Event) { - event.Fields.DeepUpdate(fields.Clone()) - }) +func addMeta(event *beat.Event, meta common.MapStr) { + if event.Meta == nil { + event.Meta = meta + } else { + event.Meta.Clone() + event.Meta.DeepUpdate(meta) + } } -// TODO: remove var-section. Keep for backwards compatibility with old publisher API. -// Remove after updating all beats to new publisher API. -// Note: this functionality is used by filebeat/winlogbeat, so prospector/harvesters -// can apply fields to events after generating the event type. -// This functionality will be removed, in favor of harvesters publishing -// event to a beat.Client with properly setup processor -var ( - preEventUserAnnotateProcessor = newAnnotateProcessor("annotateEventUserPre", func(event *beat.Event) { - const key = common.EventMetadataKey - val, exists := event.Fields[key] - if !exists { - return - } - - delete(event.Fields, key) +func pipelineEventFields(fields common.MapStr, copy bool) *processorFn { + return makeAddFieldsProcessor("pipelineFields", fields, copy) +} - if _, ok := val.(common.EventMetadata); ok { - if event.Meta == nil { - event.Meta = common.MapStr{} - } - event.Meta[key] = val - } +func makeAddTagsProcessor(name string, tags []string) *processorFn { + return newAnnotateProcessor(name, func(event *beat.Event) { + common.AddTags(event.Fields, tags) }) +} - eventUserAnnotateProcessor = newAnnotateProcessor("annotateEventUser", func(event *beat.Event) { - const key = common.EventMetadataKey - - tmp, ok := event.Meta[key] - if !ok { - return - } - - delete(event.Meta, key) - if len(event.Meta) == 0 { - event.Meta = nil - } +func makeAddFieldsProcessor(name string, fields common.MapStr, copy bool) *processorFn { + fn := func(event *beat.Event) { event.Fields.DeepUpdate(fields) } + if copy { + fn = func(event *beat.Event) { event.Fields.DeepUpdate(fields.Clone()) } + } - eventMeta := tmp.(common.EventMetadata) - common.AddTags(event.Fields, eventMeta.Tags) - if fields := eventMeta.Fields; len(fields) > 0 { - common.MergeFields(event.Fields, fields.Clone(), eventMeta.FieldsUnderRoot) - } - }) -) + return newAnnotateProcessor(name, fn) +} func debugPrintProcessor() *processorFn { // ensure only one go-routine is using the encoder (in case @@ -286,3 +263,15 @@ func debugPrintProcessor() *processorFn { return event, nil }) } + +func makeClientProcessors(config beat.ClientConfig) processors.Processor { + procs := config.Processor + if procs == nil || len(procs.All()) == 0 { + return nil + } + + return &program{ + title: "client", + list: procs.All(), + } +} diff --git a/metricbeat/mb/module/event.go b/metricbeat/mb/module/event.go index 3b3c1bd0e938..c9434cdff153 100644 --- a/metricbeat/mb/module/event.go +++ b/metricbeat/mb/module/event.go @@ -78,7 +78,6 @@ func (b EventBuilder) Build() (beat.Event, error) { event := beat.Event{ Timestamp: time.Time(timestamp), Fields: common.MapStr{ - // common.EventMetadataKey: b.metadata, b.ModuleName: moduleEvent, "metricset": metricsetData, }, diff --git a/metricbeat/mb/testing/data_generator.go b/metricbeat/mb/testing/data_generator.go index 1fe83e509777..77ad9922c844 100644 --- a/metricbeat/mb/testing/data_generator.go +++ b/metricbeat/mb/testing/data_generator.go @@ -85,9 +85,6 @@ func CreateFullEvent(ms mb.MetricSet, metricSetData common.MapStr) beat.Event { "hostname": "host.example.com", } - // Delete meta data as not needed for the event output here. - delete(fullEvent.Fields, common.EventMetadataKey) - return fullEvent }