From 6af2cd60e12344d7633b6fa1b2f420efbf58f3ba Mon Sep 17 00:00:00 2001 From: Steffen Siering Date: Fri, 4 May 2018 09:31:41 +0200 Subject: [PATCH] Fix race on shared maps in global fields (#6947) (#7011) * Fix race on shared maps in global fields On publish fields are added to an event in this order: - local/global configured fields - dynamic fields - "beat" metadata When merging the fields, shared structures must not be overwritten or updated concurrently. This is enforced by cloning the original fields structure before applying updates. This adds missing Clone operations if configured fields add new fields to the `beat` namespace or if dynamic fields are enabled. * Remove hard coded bool from testing (cherry picked from commit 71668239671c5334785ec5f68648fe0d900e8f56) --- CHANGELOG.asciidoc | 1 + libbeat/publisher/pipeline/processor.go | 35 +++++++++++++++++++------ 2 files changed, 28 insertions(+), 8 deletions(-) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index a6856b4c1c2c..4c3253bb6aa5 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -54,6 +54,7 @@ https://github.com/elastic/beats/compare/v6.0.0-beta2...master[Check the HEAD di - Remove double slashes in Windows service script. {pull}6491[6491] - Ensure Kubernetes labels/annotations don't break mapping {pull}6490[6490] - Ensure that the dashboard zip files can't contain files outside of the kibana directory. {pull}6921[6921] +- Fix map overwrite panics by cloning shared structs before doing the update. {pull}6947[6947] *Auditbeat* diff --git a/libbeat/publisher/pipeline/processor.go b/libbeat/publisher/pipeline/processor.go index 81d2a8d3426c..e8e68fbbd14d 100644 --- a/libbeat/publisher/pipeline/processor.go +++ b/libbeat/publisher/pipeline/processor.go @@ -81,11 +81,19 @@ func newProcessorPipeline( } if len(fields) > 0 { - processors.add(makeAddFieldsProcessor("fields", fields, needsCopy)) + // Enforce a copy of fields if dynamic fields are configured or beats + // metadata will be merged into the fields. + // With dynamic fields potentially changing at any time, we need to copy, + // so we do not change shared structures be accident. + fieldsNeedsCopy := needsCopy || config.DynamicFields != nil || fields["beat"] != nil + processors.add(makeAddFieldsProcessor("fields", fields, fieldsNeedsCopy)) } if config.DynamicFields != nil { - processors.add(makeAddDynMetaProcessor("dynamicFields", config.DynamicFields, needsCopy)) + checkCopy := func(m common.MapStr) bool { + return needsCopy || hasKey(m, "beat") + } + processors.add(makeAddDynMetaProcessor("dynamicFields", config.DynamicFields, checkCopy)) } // setup 5: client processor list @@ -250,13 +258,19 @@ func makeAddFieldsProcessor(name string, fields common.MapStr, copy bool) *proce return newAnnotateProcessor(name, fn) } -func makeAddDynMetaProcessor(name string, meta *common.MapStrPointer, copy bool) *processorFn { - fn := func(event *beat.Event) { event.Fields.DeepUpdate(meta.Get()) } - if copy { - fn = func(event *beat.Event) { event.Fields.DeepUpdate(meta.Get().Clone()) } - } +func makeAddDynMetaProcessor( + name string, + meta *common.MapStrPointer, + checkCopy func(m common.MapStr) bool, +) *processorFn { + return newAnnotateProcessor(name, func(event *beat.Event) { + dynFields := meta.Get() + if checkCopy(dynFields) { + dynFields = dynFields.Clone() + } - return newAnnotateProcessor(name, fn) + event.Fields.DeepUpdate(dynFields) + }) } func debugPrintProcessor(info beat.Info) *processorFn { @@ -290,3 +304,8 @@ func makeClientProcessors(config beat.ClientConfig) processors.Processor { list: procs.All(), } } + +func hasKey(m common.MapStr, key string) bool { + _, exists := m[key] + return exists +}