diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index ea10f9c2dac1..433d185c0c8e 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -112,6 +112,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Fix NPE on some monitor configuration errors. {pull}11910[11910] - Fix NPEs / resource leaks when executing config checks. {pull}11165[11165] +- Fix duplicated IPs on `mode: all` monitors. {pull}12458[12458] *Journalbeat* diff --git a/heartbeat/monitors/task.go b/heartbeat/monitors/task.go index 6ea1257339ba..dc5e1831a338 100644 --- a/heartbeat/monitors/task.go +++ b/heartbeat/monitors/task.go @@ -138,12 +138,12 @@ func runPublishJob(job jobs.Job, client beat.Client) []scheduler.TaskFunc { Fields: common.MapStr{}, } - next, err := job(event) + conts, err := job(event) if err != nil { logp.Err("Job %v failed with: ", err) } - hasContinuations := len(next) > 0 + hasContinuations := len(conts) > 0 if event.Fields != nil && !eventext.IsEventCancelled(event) { // If continuations are present we defensively publish a clone of the event @@ -162,15 +162,20 @@ func runPublishJob(job jobs.Job, client beat.Client) []scheduler.TaskFunc { } } - if len(next) == 0 { + if !hasContinuations { return nil } - continuations := make([]scheduler.TaskFunc, len(next)) - for i, n := range next { - continuations[i] = func() []scheduler.TaskFunc { - return runPublishJob(n, client) + contTasks := make([]scheduler.TaskFunc, len(conts)) + for i, cont := range conts { + // Move the continuation into the local block scope + // This is important since execution is deferred + // Without this only the last continuation will be executed len(conts) times + localCont := cont + + contTasks[i] = func() []scheduler.TaskFunc { + return runPublishJob(localCont, client) } } - return continuations + return contTasks } diff --git a/heartbeat/monitors/task_test.go b/heartbeat/monitors/task_test.go index 56e832dc4a3d..218ee31bafab 100644 --- a/heartbeat/monitors/task_test.go +++ b/heartbeat/monitors/task_test.go @@ -30,10 +30,13 @@ import ( ) func Test_runPublishJob(t *testing.T) { - simpleJob := func(event *beat.Event) (j []jobs.Job, e error) { - eventext.MergeEventFields(event, common.MapStr{"foo": "bar"}) - return nil, nil + defineJob := func(fields common.MapStr) func(event *beat.Event) (j []jobs.Job, e error) { + return func(event *beat.Event) (j []jobs.Job, e error) { + eventext.MergeEventFields(event, fields) + return nil, nil + } } + simpleJob := defineJob(common.MapStr{"foo": "bar"}) testCases := []struct { name string @@ -58,6 +61,21 @@ func Test_runPublishJob(t *testing.T) { mapval.MustCompile(mapval.Map{"foo": "bar"}), }, }, + { + "multiple conts", + func(event *beat.Event) (j []jobs.Job, e error) { + simpleJob(event) + return []jobs.Job{ + defineJob(common.MapStr{"baz": "bot"}), + defineJob(common.MapStr{"blah": "blargh"}), + }, nil + }, + []mapval.Validator{ + mapval.MustCompile(mapval.Map{"foo": "bar"}), + mapval.MustCompile(mapval.Map{"baz": "bot"}), + mapval.MustCompile(mapval.Map{"blah": "blargh"}), + }, + }, { "cancelled cont", func(event *beat.Event) (j []jobs.Job, e error) {