Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cherry-pick #5149 to 6.0: Reorder processors in publisher pipeline #5165

Merged
merged 1 commit into from
Sep 12, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions libbeat/common/mapstr.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,8 @@ import (
// Event metadata constants. These keys are used within libbeat to identify
// metadata stored in an event.
const (
EventMetadataKey = "_event_metadata"
FieldsKey = "fields"
TagsKey = "tags"
FieldsKey = "fields"
TagsKey = "tags"
)

var (
Expand Down
76 changes: 46 additions & 30 deletions libbeat/publisher/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,13 @@ type pipelineProcessors struct {
// The pipeline its processor settings for
// constructing the clients complete processor
// pipeline on connect.
beatMetaProcessor beat.Processor
eventMetaProcessor beat.Processor
processors beat.Processor
disabled bool // disabled is set if outputs have been disabled via CLI
beatsMeta common.MapStr
fields common.MapStr
tags []string

processors beat.Processor

disabled bool // disabled is set if outputs have been disabled via CLI
}

// Settings is used to pass additional settings to a newly created pipeline instance.
Expand Down Expand Up @@ -132,39 +135,17 @@ func New(
out outputs.Group,
settings Settings,
) (*Pipeline, error) {
annotations := settings.Annotations
var err error

var beatMeta beat.Processor
if meta := annotations.Beat; meta != nil {
beatMeta = beatAnnotateProcessor(meta)
}

var eventMeta beat.Processor
if em := annotations.Event; len(em.Fields) > 0 || len(em.Tags) > 0 {
eventMeta = eventAnnotateProcessor(em)
}

var prog beat.Processor
if ps := settings.Processors; ps != nil && len(ps.List) > 0 {
tmp := &program{title: "global"}
for _, p := range ps.List {
tmp.add(p)
}
prog = tmp
}

log := defaultLogger
annotations := settings.Annotations
processors := settings.Processors
disabledOutput := settings.Disabled
p := &Pipeline{
logger: log,
waitCloseMode: settings.WaitCloseMode,
waitCloseTimeout: settings.WaitClose,
processors: pipelineProcessors{
beatMetaProcessor: beatMeta,
eventMetaProcessor: eventMeta,
processors: prog,
disabled: settings.Disabled,
},
processors: makePipelineProcessors(annotations, processors, disabledOutput),
}
p.ackBuilder = &pipelineEmptyACK{p}
p.ackActive = atomic.MakeBool(true)
Expand Down Expand Up @@ -382,3 +363,38 @@ func (e *waitCloser) dec(n int) {
func (e *waitCloser) wait() {
e.events.Wait()
}

func makePipelineProcessors(
annotations Annotations,
processors *processors.Processors,
disabled bool,
) pipelineProcessors {
p := pipelineProcessors{
disabled: disabled,
}

hasProcessors := processors != nil && len(processors.List) > 0
if hasProcessors {
tmp := &program{title: "global"}
for _, p := range processors.List {
tmp.add(p)
}
p.processors = tmp
}

if meta := annotations.Beat; meta != nil {
p.beatsMeta = common.MapStr{"beat": meta}
}

if em := annotations.Event; len(em.Fields) > 0 {
fields := common.MapStr{}
common.MergeFields(fields, em.Fields.Clone(), em.FieldsUnderRoot)
p.fields = fields
}

if t := annotations.Event.Tags; len(t) > 0 {
p.tags = t
}

return p
}
183 changes: 86 additions & 97 deletions libbeat/publisher/pipeline/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,73 +29,76 @@ type processorFn struct {
//
// Pipeline (C=client, P=pipeline)
//
// 1. (P) extract EventMetadataKey fields + tags (to be removed in favor of 4)
// 2. (P) generalize/normalize event
// 3. (P) add beats metadata (name, hostname, version)
// 4. (C) add Meta from client Config to event.Meta
// 5. (C) add Fields from client config to event.Fields
// 6. (P) add pipeline fields + tags
// 7. (C) add client fields + tags
// 8. (P/C) apply EventMetadataKey fields + tags (to be removed in favor of 4)
// 9. (C) client processors list
// 10. (P) pipeline processors list
// 11. (P) (if publish/debug enabled) log event
// 12. (P) (if output disabled) dropEvent
// 1. (P) generalize/normalize event
// 2. (C) add Meta from client Config to event.Meta
// 3. (C) add Fields from client config to event.Fields
// 4. (P) add pipeline fields + tags
// 5. (C) add client fields + tags
// 6. (C) client processors list
// 7. (P) add beats metadata
// 8. (P) pipeline processors list
// 9. (P) (if publish/debug enabled) log event
// 10. (P) (if output disabled) dropEvent
func (p *Pipeline) newProcessorPipeline(
config beat.ClientConfig,
) beat.Processor {
processors := &program{title: "processPipeline"}
var (
// pipeline processors
processors = &program{title: "processPipeline"}

global := p.processors
// client fields and metadata
clientMeta = config.Meta
localProcessors = makeClientProcessors(config)

// setup 1: extract EventMetadataKey fields + tags
processors.add(preEventUserAnnotateProcessor)
// pipeline global
global = p.processors
)

// setup 2 and 3: generalize/normalize output (P)
needsCopy := localProcessors != nil || global.processors != nil

// setup 1: generalize/normalize output (P)
processors.add(generalizeProcessor)
processors.add(global.beatMetaProcessor)

// setup 4: add Meta from client config
if m := config.Meta; len(m) > 0 {
processors.add(clientEventMeta(m))
// setup 2: add Meta from client config (C)
if m := clientMeta; len(m) > 0 {
processors.add(clientEventMeta(m, needsCopy))
}

// setup 5: add Fields from client config
if m := config.Fields; len(m) > 0 {
processors.add(clientEventFields(m))
// setup 4, 5: pipeline tags + client tags
var tags []string
tags = append(tags, global.tags...)
tags = append(tags, config.EventMetadata.Tags...)
if len(tags) > 0 {
processors.add(makeAddTagsProcessor("tags", tags))
}

// setup 6: add event fields + tags (P)
processors.add(global.eventMetaProcessor)

// setup 7: add fields + tags (C)
if em := config.EventMetadata; len(em.Fields) > 0 || len(em.Tags) > 0 {
processors.add(eventAnnotateProcessor(em))
// setup 3, 4, 5: client config fields + pipeline fields + client fields
fields := config.Fields.Clone()
fields.DeepUpdate(global.fields)
if em := config.EventMetadata; len(em.Fields) > 0 {
common.MergeFields(fields, em.Fields.Clone(), em.FieldsUnderRoot)
}
if len(fields) > 0 {
processors.add(makeAddFieldsProcessor("fields", fields, needsCopy))
}

// setup 8: apply EventMetadata fields + tags
processors.add(eventUserAnnotateProcessor)

// setup 9: client processors (C)
if procs := config.Processor; procs != nil {
if lst := procs.All(); len(lst) > 0 {
// setup 5: client processor list
processors.add(localProcessors)

processors.add(&program{
title: "client",
list: lst,
})
}
// setup 6: add beats metadata
if meta := global.beatsMeta; len(meta) > 0 {
processors.add(makeAddFieldsProcessor("beatsMeta", meta, needsCopy))
}

// setup 10: pipeline processors (P)
// setup 7: pipeline processors list
processors.add(global.processors)

// setup 11: debug print final event (P)
// setup 9: debug print final event (P)
if logp.IsDebug("publish") {
processors.add(debugPrintProcessor())
}

// setup 12: drop all events if outputs are disabled
// setup 10: drop all events if outputs are disabled (P)
if global.disabled {
processors.add(dropDisabledProcessor)
}
Expand Down Expand Up @@ -205,67 +208,41 @@ func eventAnnotateProcessor(eventMeta common.EventMetadata) *processorFn {
})
}

func clientEventMeta(meta common.MapStr) *processorFn {
return newAnnotateProcessor("@metadata", func(event *beat.Event) {
if event.Meta == nil {
event.Meta = meta.Clone()
} else {
event.Meta = event.Meta.Clone()
event.Meta.DeepUpdate(meta.Clone())
}
})
func clientEventMeta(meta common.MapStr, needsCopy bool) *processorFn {
fn := func(event *beat.Event) { addMeta(event, meta) }
if needsCopy {
fn = func(event *beat.Event) { addMeta(event, meta.Clone()) }
}
return newAnnotateProcessor("@metadata", fn)
}

func clientEventFields(fields common.MapStr) *processorFn {
return newAnnotateProcessor("globalFields", func(event *beat.Event) {
event.Fields.DeepUpdate(fields.Clone())
})
func addMeta(event *beat.Event, meta common.MapStr) {
if event.Meta == nil {
event.Meta = meta
} else {
event.Meta.Clone()
event.Meta.DeepUpdate(meta)
}
}

// TODO: remove var-section. Keep for backwards compatibility with old publisher API.
// Remove after updating all beats to new publisher API.
// Note: this functionality is used by filebeat/winlogbeat, so prospector/harvesters
// can apply fields to events after generating the event type.
// This functionality will be removed, in favor of harvesters publishing
// event to a beat.Client with properly setup processor
var (
preEventUserAnnotateProcessor = newAnnotateProcessor("annotateEventUserPre", func(event *beat.Event) {
const key = common.EventMetadataKey
val, exists := event.Fields[key]
if !exists {
return
}

delete(event.Fields, key)
func pipelineEventFields(fields common.MapStr, copy bool) *processorFn {
return makeAddFieldsProcessor("pipelineFields", fields, copy)
}

if _, ok := val.(common.EventMetadata); ok {
if event.Meta == nil {
event.Meta = common.MapStr{}
}
event.Meta[key] = val
}
func makeAddTagsProcessor(name string, tags []string) *processorFn {
return newAnnotateProcessor(name, func(event *beat.Event) {
common.AddTags(event.Fields, tags)
})
}

eventUserAnnotateProcessor = newAnnotateProcessor("annotateEventUser", func(event *beat.Event) {
const key = common.EventMetadataKey

tmp, ok := event.Meta[key]
if !ok {
return
}

delete(event.Meta, key)
if len(event.Meta) == 0 {
event.Meta = nil
}
func makeAddFieldsProcessor(name string, fields common.MapStr, copy bool) *processorFn {
fn := func(event *beat.Event) { event.Fields.DeepUpdate(fields) }
if copy {
fn = func(event *beat.Event) { event.Fields.DeepUpdate(fields.Clone()) }
}

eventMeta := tmp.(common.EventMetadata)
common.AddTags(event.Fields, eventMeta.Tags)
if fields := eventMeta.Fields; len(fields) > 0 {
common.MergeFields(event.Fields, fields.Clone(), eventMeta.FieldsUnderRoot)
}
})
)
return newAnnotateProcessor(name, fn)
}

func debugPrintProcessor() *processorFn {
// ensure only one go-routine is using the encoder (in case
Expand All @@ -286,3 +263,15 @@ func debugPrintProcessor() *processorFn {
return event, nil
})
}

func makeClientProcessors(config beat.ClientConfig) processors.Processor {
procs := config.Processor
if procs == nil || len(procs.All()) == 0 {
return nil
}

return &program{
title: "client",
list: procs.All(),
}
}
1 change: 0 additions & 1 deletion metricbeat/mb/module/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ func (b EventBuilder) Build() (beat.Event, error) {
event := beat.Event{
Timestamp: time.Time(timestamp),
Fields: common.MapStr{
// common.EventMetadataKey: b.metadata,
b.ModuleName: moduleEvent,
"metricset": metricsetData,
},
Expand Down
3 changes: 0 additions & 3 deletions metricbeat/mb/testing/data_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,6 @@ func CreateFullEvent(ms mb.MetricSet, metricSetData common.MapStr) beat.Event {
"hostname": "host.example.com",
}

// Delete meta data as not needed for the event output here.
delete(fullEvent.Fields, common.EventMetadataKey)

return fullEvent
}

Expand Down