From bd14dda1f3c7b24735d4332737b5411b575e5715 Mon Sep 17 00:00:00 2001 From: Andrew Cholakian Date: Mon, 22 Apr 2019 22:07:34 -0500 Subject: [PATCH 01/14] Initial concept for fixing 'all' pings' Fixes https://github.com/elastic/beats/issues/11737 --- heartbeat/monitors/util.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/heartbeat/monitors/util.go b/heartbeat/monitors/util.go index fa48113ec72..88107446e15 100644 --- a/heartbeat/monitors/util.go +++ b/heartbeat/monitors/util.go @@ -211,7 +211,9 @@ func makeByHostAllIPJob( ipFields := resolveIPEvent(ip.String(), resolveRTT) cont[i] = wrappers.WithFields(ipFields, pingFactory(addr)) } - return cont, nil + firstIPJob := cont[0] + _, err = firstIPJob(event) + return cont[1 : len(ips)-1], err } } From d92b670eeba405b24a2cb29678f3e40dd1d6da92 Mon Sep 17 00:00:00 2001 From: Andrew Cholakian Date: Tue, 23 Apr 2019 15:07:46 -0500 Subject: [PATCH 02/14] Add cancelled tasks --- heartbeat/eventext/eventext.go | 17 +++++++++++++++++ heartbeat/monitors/task.go | 4 +++- heartbeat/monitors/util.go | 8 +++++--- heartbeat/monitors/wrappers/monitors.go | 17 +++++++++++------ 4 files changed, 36 insertions(+), 10 deletions(-) diff --git a/heartbeat/eventext/eventext.go b/heartbeat/eventext/eventext.go index daf25ab9602..17b70a0f747 100644 --- a/heartbeat/eventext/eventext.go +++ b/heartbeat/eventext/eventext.go @@ -30,3 +30,20 @@ func MergeEventFields(e *beat.Event, merge common.MapStr) { e.Fields = merge } } + +// Marks the event as cancelled in the metadata. +const EVENT_CANCELLED_META_KEY = "meta.__hb_evt_cancel__" + +func CancelEvent(event *beat.Event) { + if event != nil { + event.PutValue(EVENT_CANCELLED_META_KEY, true) + } +} + +func IsEventCancelled(event *beat.Event) bool { + if event == nil { + return false + } + v, err := event.GetValue(EVENT_CANCELLED_META_KEY) + return err == nil && v == true +} diff --git a/heartbeat/monitors/task.go b/heartbeat/monitors/task.go index 43e4a8056fa..0b03050f1c2 100644 --- a/heartbeat/monitors/task.go +++ b/heartbeat/monitors/task.go @@ -20,6 +20,8 @@ package monitors import ( "fmt" + "github.com/elastic/beats/heartbeat/eventext" + "github.com/pkg/errors" "github.com/elastic/beats/heartbeat/monitors/jobs" @@ -96,7 +98,7 @@ func (t *configuredJob) prepareSchedulerJob(job jobs.Job) scheduler.TaskFunc { logp.Err("Job %v failed with: ", err) } - if event != nil && event.Fields != nil { + if event != nil && event.Fields != nil && !eventext.IsEventCancelled(event) { // If continuations are present we defensively publish a clone of the event // in the chance that the event shares underlying data with the events for continuations // This prevents races where the pipeline publish could accidentally alter multiple events. diff --git a/heartbeat/monitors/util.go b/heartbeat/monitors/util.go index 88107446e15..8b86f9797ea 100644 --- a/heartbeat/monitors/util.go +++ b/heartbeat/monitors/util.go @@ -23,6 +23,8 @@ import ( "net" "time" + "github.com/elastic/beats/heartbeat/eventext" + "github.com/elastic/beats/heartbeat/look" "github.com/elastic/beats/heartbeat/monitors/jobs" "github.com/elastic/beats/heartbeat/monitors/wrappers" @@ -211,9 +213,9 @@ func makeByHostAllIPJob( ipFields := resolveIPEvent(ip.String(), resolveRTT) cont[i] = wrappers.WithFields(ipFields, pingFactory(addr)) } - firstIPJob := cont[0] - _, err = firstIPJob(event) - return cont[1 : len(ips)-1], err + eventext.CancelEvent(event) + + return cont, err } } diff --git a/heartbeat/monitors/wrappers/monitors.go b/heartbeat/monitors/wrappers/monitors.go index ad05d22c5e9..4de60250582 100644 --- a/heartbeat/monitors/wrappers/monitors.go +++ b/heartbeat/monitors/wrappers/monitors.go @@ -155,13 +155,18 @@ func makeAddSummary() jobs.JobWrapper { state.mtx.Lock() defer state.mtx.Unlock() - // After each job - eventStatus, _ := event.GetValue("monitor.status") - if eventStatus == "up" { - state.up++ - } else { - state.down++ + // If the event is cancelled we don't record it as being either up or down since + // we discard the event anyway. + if !eventext.IsEventCancelled(event) { + // After each job + eventStatus, _ := event.GetValue("monitor.status") + if eventStatus == "up" { + state.up++ + } else { + state.down++ + } } + // No error check needed here event.PutValue("monitor.check_group", state.checkGroup) From 49b1347a8eb143f001c59a7b6b233236918cbd8f Mon Sep 17 00:00:00 2001 From: Andrew Cholakian Date: Tue, 23 Apr 2019 16:23:29 -0500 Subject: [PATCH 03/14] Moar --- heartbeat/monitors/task.go | 78 ++++++++-------- heartbeat/monitors/task_test.go | 97 ++++++++++++++++++++ heartbeat/monitors/wrappers/monitors_test.go | 68 +++++++++++++- 3 files changed, 206 insertions(+), 37 deletions(-) create mode 100644 heartbeat/monitors/task_test.go diff --git a/heartbeat/monitors/task.go b/heartbeat/monitors/task.go index 0b03050f1c2..4ea3d891bb0 100644 --- a/heartbeat/monitors/task.go +++ b/heartbeat/monitors/task.go @@ -88,42 +88,7 @@ func (e ProcessorsError) Error() string { func (t *configuredJob) prepareSchedulerJob(job jobs.Job) scheduler.TaskFunc { return func() []scheduler.TaskFunc { - event := &beat.Event{ - Fields: common.MapStr{}, - } - next, err := job(event) - hasContinuations := len(next) > 0 - - if err != nil { - logp.Err("Job %v failed with: ", err) - } - - if event != nil && event.Fields != nil && !eventext.IsEventCancelled(event) { - // If continuations are present we defensively publish a clone of the event - // in the chance that the event shares underlying data with the events for continuations - // This prevents races where the pipeline publish could accidentally alter multiple events. - if hasContinuations { - clone := beat.Event{ - Timestamp: event.Timestamp, - Meta: event.Meta.Clone(), - Fields: event.Fields.Clone(), - } - t.client.Publish(clone) - } else { - // no clone needed if no continuations - t.client.Publish(*event) - } - } - - if len(next) == 0 { - return nil - } - - continuations := make([]scheduler.TaskFunc, len(next)) - for i, n := range next { - continuations[i] = t.prepareSchedulerJob(n) - } - return continuations + return runPublishJob(job, t.client) } } @@ -168,3 +133,44 @@ func (t *configuredJob) Stop() { t.client.Close() } } + +func runPublishJob(job jobs.Job, client beat.Client) []scheduler.TaskFunc { + event := &beat.Event{ + Fields: common.MapStr{}, + } + next, err := job(event) + hasContinuations := len(next) > 0 + + if err != nil { + logp.Err("Job %v failed with: ", err) + } + + if event.Fields != nil && !eventext.IsEventCancelled(event) { + // If continuations are present we defensively publish a clone of the event + // in the chance that the event shares underlying data with the events for continuations + // This prevents races where the pipeline publish could accidentally alter multiple events. + if hasContinuations { + clone := beat.Event{ + Timestamp: event.Timestamp, + Meta: event.Meta.Clone(), + Fields: event.Fields.Clone(), + } + client.Publish(clone) + } else { + // no clone needed if no continuations + client.Publish(*event) + } + } + + if len(next) == 0 { + return nil + } + + continuations := make([]scheduler.TaskFunc, len(next)) + for i, n := range next { + continuations[i] = func() []scheduler.TaskFunc { + return runPublishJob(n, client) + } + } + return continuations +} diff --git a/heartbeat/monitors/task_test.go b/heartbeat/monitors/task_test.go new file mode 100644 index 00000000000..80b1073bd55 --- /dev/null +++ b/heartbeat/monitors/task_test.go @@ -0,0 +1,97 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package monitors + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/elastic/beats/libbeat/common" + + "github.com/elastic/beats/heartbeat/eventext" + "github.com/elastic/beats/heartbeat/monitors/jobs" + "github.com/elastic/beats/libbeat/beat" + "github.com/elastic/beats/libbeat/common/mapval" +) + +func Test_runPublishJob(t *testing.T) { + simpleJob := func(event *beat.Event) (j []jobs.Job, e error) { + eventext.MergeEventFields(event, common.MapStr{"foo": "bar"}) + return nil, nil + } + + testCases := []struct { + name string + job jobs.Job + validators []mapval.Validator + }{ + { + "simple", + simpleJob, + []mapval.Validator{ + mapval.MustCompile(mapval.Map{"foo": "bar"}), + }, + }, + { + "one cont", + func(event *beat.Event) (j []jobs.Job, e error) { + simpleJob(event) + return []jobs.Job{simpleJob}, nil + }, + []mapval.Validator{ + mapval.MustCompile(mapval.Map{"foo": "bar"}), + mapval.MustCompile(mapval.Map{"foo": "bar"}), + }, + }, + { + "cancelled cont", + func(event *beat.Event) (j []jobs.Job, e error) { + eventext.CancelEvent(event) + return []jobs.Job{simpleJob}, nil + }, + []mapval.Validator{ + mapval.MustCompile(mapval.Map{"foo": "bar"}), + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + client := &MockBeatClient{} + queue := runPublishJob(tc.job, client) + for { + if len(queue) == 0 { + break + } + tf := queue[0] + queue = queue[1:] + conts := tf() + for _, cont := range conts { + queue = append(queue, cont) + } + } + client.Close() + + require.Len(t, client.publishes, len(tc.validators)) + for idx, event := range client.publishes { + mapval.Test(t, tc.validators[idx], event.Fields) + } + }) + } +} diff --git a/heartbeat/monitors/wrappers/monitors_test.go b/heartbeat/monitors/wrappers/monitors_test.go index 00755b4c6e2..f9116a9a12a 100644 --- a/heartbeat/monitors/wrappers/monitors_test.go +++ b/heartbeat/monitors/wrappers/monitors_test.go @@ -52,9 +52,11 @@ func testCommonWrap(t *testing.T, tt testDef) { results, err := jobs.ExecJobsAndConts(t, wrapped) assert.NoError(t, err) + require.Equal(t, len(results), len(tt.want), "Expected test def wants to correspond exactly to number results.") for idx, r := range results { t.Run(fmt.Sprintf("result at index %d", idx), func(t *testing.T) { - mapval.Test(t, mapval.Strict(tt.want[idx]), r.Fields) + want := tt.want[idx] + mapval.Test(t, mapval.Strict(want), r.Fields) }) } }) @@ -204,6 +206,70 @@ func TestMultiJobConts(t *testing.T) { }) } +func TestMultiJobContsCancelledEvents(t *testing.T) { + fields := fields{"myid", "myname", "mytyp"} + + uniqScope := mapval.ScopedIsUnique() + + makeContJob := func(t *testing.T, u string) jobs.Job { + return func(event *beat.Event) ([]jobs.Job, error) { + eventext.MergeEventFields(event, common.MapStr{"cont": "1st"}) + eventext.CancelEvent(event) + u, err := url.Parse(u) + require.NoError(t, err) + eventext.MergeEventFields(event, common.MapStr{"url": URLFields(u)}) + return []jobs.Job{ + func(event *beat.Event) ([]jobs.Job, error) { + eventext.MergeEventFields(event, common.MapStr{"cont": "2nd"}) + eventext.MergeEventFields(event, common.MapStr{"url": URLFields(u)}) + return nil, nil + }, + }, nil + } + } + + contJobValidator := func(u string, msg string) mapval.Validator { + return mapval.Compose( + urlValidator(t, u), + mapval.MustCompile(mapval.Map{"cont": msg}), + mapval.MustCompile(mapval.Map{ + "monitor": mapval.Map{ + "duration.us": mapval.IsDuration, + "id": uniqScope.IsUniqueTo(u), + "name": fields.name, + "type": fields.typ, + "status": "up", + "check_group": uniqScope.IsUniqueTo(u), + }, + }), + ) + } + + testCommonWrap(t, testDef{ + "multi-job-continuations", + fields, + []jobs.Job{makeContJob(t, "http://foo.com"), makeContJob(t, "http://bar.com")}, + []mapval.Validator{ + mapval.Compose( + contJobValidator("http://foo.com", "1st"), + mapval.MustCompile(mapval.Map{eventext.EVENT_CANCELLED_META_KEY: true}), + ), + mapval.Compose( + contJobValidator("http://foo.com", "2nd"), + summaryValidator(1, 0), + ), + mapval.Compose( + contJobValidator("http://bar.com", "1st"), + mapval.MustCompile(mapval.Map{eventext.EVENT_CANCELLED_META_KEY: true}), + ), + mapval.Compose( + contJobValidator("http://bar.com", "2nd"), + summaryValidator(1, 0), + ), + }, + }) +} + func makeURLJob(t *testing.T, u string) jobs.Job { parsed, err := url.Parse(u) require.NoError(t, err) From fa73f91bb03b6dde978b023cd8dd5bd577cbcd6e Mon Sep 17 00:00:00 2001 From: Andrew Cholakian Date: Tue, 23 Apr 2019 16:32:20 -0500 Subject: [PATCH 04/14] Make hound happy --- heartbeat/eventext/eventext.go | 8 +++++--- heartbeat/monitors/wrappers/monitors_test.go | 4 ++-- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/heartbeat/eventext/eventext.go b/heartbeat/eventext/eventext.go index 17b70a0f747..eecc3a999a7 100644 --- a/heartbeat/eventext/eventext.go +++ b/heartbeat/eventext/eventext.go @@ -32,18 +32,20 @@ func MergeEventFields(e *beat.Event, merge common.MapStr) { } // Marks the event as cancelled in the metadata. -const EVENT_CANCELLED_META_KEY = "meta.__hb_evt_cancel__" +const EventCancelledMetaKey = "meta.__hb_evt_cancel__" +// CancelEvent marks the event as cancelled. Downstream consumers of it should not emit nor output this event. func CancelEvent(event *beat.Event) { if event != nil { - event.PutValue(EVENT_CANCELLED_META_KEY, true) + event.PutValue(EventCancelledMetaKey, true) } } +// IsEventCancelled checks for the marker left by CancelEvent. func IsEventCancelled(event *beat.Event) bool { if event == nil { return false } - v, err := event.GetValue(EVENT_CANCELLED_META_KEY) + v, err := event.GetValue(EventCancelledMetaKey) return err == nil && v == true } diff --git a/heartbeat/monitors/wrappers/monitors_test.go b/heartbeat/monitors/wrappers/monitors_test.go index f9116a9a12a..db23fff683f 100644 --- a/heartbeat/monitors/wrappers/monitors_test.go +++ b/heartbeat/monitors/wrappers/monitors_test.go @@ -252,7 +252,7 @@ func TestMultiJobContsCancelledEvents(t *testing.T) { []mapval.Validator{ mapval.Compose( contJobValidator("http://foo.com", "1st"), - mapval.MustCompile(mapval.Map{eventext.EVENT_CANCELLED_META_KEY: true}), + mapval.MustCompile(mapval.Map{eventext.EventCancelledMetaKey: true}), ), mapval.Compose( contJobValidator("http://foo.com", "2nd"), @@ -260,7 +260,7 @@ func TestMultiJobContsCancelledEvents(t *testing.T) { ), mapval.Compose( contJobValidator("http://bar.com", "1st"), - mapval.MustCompile(mapval.Map{eventext.EVENT_CANCELLED_META_KEY: true}), + mapval.MustCompile(mapval.Map{eventext.EventCancelledMetaKey: true}), ), mapval.Compose( contJobValidator("http://bar.com", "2nd"), From b9e72bf59fa2ada8c356c380ff0e1592f75255a6 Mon Sep 17 00:00:00 2001 From: Andrew Cholakian Date: Tue, 23 Apr 2019 16:39:31 -0500 Subject: [PATCH 05/14] Make hound happy --- heartbeat/eventext/eventext.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/heartbeat/eventext/eventext.go b/heartbeat/eventext/eventext.go index eecc3a999a7..6f3143a98dd 100644 --- a/heartbeat/eventext/eventext.go +++ b/heartbeat/eventext/eventext.go @@ -31,7 +31,7 @@ func MergeEventFields(e *beat.Event, merge common.MapStr) { } } -// Marks the event as cancelled in the metadata. +// EventCalledMetaKey is the full path to the key marking an event as cancelled. const EventCancelledMetaKey = "meta.__hb_evt_cancel__" // CancelEvent marks the event as cancelled. Downstream consumers of it should not emit nor output this event. From 73166750a57f28cee6b3977ee58b940d05dbef2d Mon Sep 17 00:00:00 2001 From: Andrew Cholakian Date: Tue, 23 Apr 2019 16:42:06 -0500 Subject: [PATCH 06/14] Moar --- heartbeat/monitors/task_test.go | 3 +-- heartbeat/monitors/util.go | 3 ++- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/heartbeat/monitors/task_test.go b/heartbeat/monitors/task_test.go index 80b1073bd55..56e832dc4a3 100644 --- a/heartbeat/monitors/task_test.go +++ b/heartbeat/monitors/task_test.go @@ -22,11 +22,10 @@ import ( "github.com/stretchr/testify/require" - "github.com/elastic/beats/libbeat/common" - "github.com/elastic/beats/heartbeat/eventext" "github.com/elastic/beats/heartbeat/monitors/jobs" "github.com/elastic/beats/libbeat/beat" + "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/common/mapval" ) diff --git a/heartbeat/monitors/util.go b/heartbeat/monitors/util.go index 8b86f9797ea..df32a928aef 100644 --- a/heartbeat/monitors/util.go +++ b/heartbeat/monitors/util.go @@ -24,7 +24,6 @@ import ( "time" "github.com/elastic/beats/heartbeat/eventext" - "github.com/elastic/beats/heartbeat/look" "github.com/elastic/beats/heartbeat/monitors/jobs" "github.com/elastic/beats/heartbeat/monitors/wrappers" @@ -213,6 +212,8 @@ func makeByHostAllIPJob( ipFields := resolveIPEvent(ip.String(), resolveRTT) cont[i] = wrappers.WithFields(ipFields, pingFactory(addr)) } + // Ideally we would test this invocation. This function however is really hard to to test given all the extra context it takes in + // In a future refactor we could perhaps test that this in correctly invoked. eventext.CancelEvent(event) return cont, err From 6a435452f4545184279d00cfe5b62298d5917817 Mon Sep 17 00:00:00 2001 From: Andrew Cholakian Date: Wed, 24 Apr 2019 22:25:03 -0500 Subject: [PATCH 07/14] [Heartbeat] Fix race on WithFields --- heartbeat/monitors/wrappers/util.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/heartbeat/monitors/wrappers/util.go b/heartbeat/monitors/wrappers/util.go index 83778069d99..34e4fa53fde 100644 --- a/heartbeat/monitors/wrappers/util.go +++ b/heartbeat/monitors/wrappers/util.go @@ -33,12 +33,14 @@ func WithFields(fields common.MapStr, origJob jobs.Job) jobs.Job { return jobs.Wrap(origJob, Fields(fields)) } -// Fields is a JobWrapper that adds fields to a given event +// Fields is a JobWrapper that adds fields to a given event. func Fields(fields common.MapStr) jobs.JobWrapper { return func(origJob jobs.Job) jobs.Job { return func(event *beat.Event) ([]jobs.Job, error) { cont, err := origJob(event) - eventext.MergeEventFields(event, fields) + // Clone is necessary below for threadsafety since this may be invoked + // across continuations. + eventext.MergeEventFields(event, fields.Clone()) return cont, err } } From 7fdcb33c3f4ee14b8fa5621f90c84be7a9fd3cd8 Mon Sep 17 00:00:00 2001 From: Andrew Cholakian Date: Thu, 25 Apr 2019 13:32:49 -0500 Subject: [PATCH 08/14] moar --- heartbeat/eventext/eventext.go | 17 ++++++++++------- heartbeat/heartbeat.yml | 10 ++-------- 2 files changed, 12 insertions(+), 15 deletions(-) diff --git a/heartbeat/eventext/eventext.go b/heartbeat/eventext/eventext.go index 6f3143a98dd..6435656c900 100644 --- a/heartbeat/eventext/eventext.go +++ b/heartbeat/eventext/eventext.go @@ -25,27 +25,30 @@ import ( // MergeEventFields merges the given common.MapStr into the given Event's Fields. func MergeEventFields(e *beat.Event, merge common.MapStr) { if e.Fields != nil { - e.Fields.DeepUpdate(merge) + e.Fields.DeepUpdate(merge.Clone()) } else { - e.Fields = merge + e.Fields = merge.Clone() } } -// EventCalledMetaKey is the full path to the key marking an event as cancelled. -const EventCancelledMetaKey = "meta.__hb_evt_cancel__" +// EventCalledMetaKey is the path to the @metadata key marking an event as cancelled. +const EventCancelledMetaKey = "__hb_evt_cancel__" // CancelEvent marks the event as cancelled. Downstream consumers of it should not emit nor output this event. func CancelEvent(event *beat.Event) { if event != nil { - event.PutValue(EventCancelledMetaKey, true) + if event.Meta == nil { + event.Meta = common.MapStr{} + } + event.Meta.Put(EventCancelledMetaKey, true) } } // IsEventCancelled checks for the marker left by CancelEvent. func IsEventCancelled(event *beat.Event) bool { - if event == nil { + if event == nil || event.Meta == nil { return false } - v, err := event.GetValue(EventCancelledMetaKey) + v, err := event.Meta.GetValue(EventCancelledMetaKey) return err == nil && v == true } diff --git a/heartbeat/heartbeat.yml b/heartbeat/heartbeat.yml index cfeaf7485e2..44bc41c7c36 100644 --- a/heartbeat/heartbeat.yml +++ b/heartbeat/heartbeat.yml @@ -22,16 +22,10 @@ heartbeat.config.monitors: # Configure monitors inline heartbeat.monitors: - type: http - - # List or urls to query - urls: ["http://localhost:9200"] - - # Configure task schedule + urls: ["http://elastic.co:9200"] + mode: all schedule: '@every 10s' - # Total test connection and data exchange timeout - #timeout: 16s - #==================== Elasticsearch template setting ========================== setup.template.settings: From 8bb663aef2d3d8032b59bd4230d369e0e3c94bec Mon Sep 17 00:00:00 2001 From: Andrew Cholakian Date: Thu, 25 Apr 2019 21:48:32 -0500 Subject: [PATCH 09/14] Revert "[Heartbeat] Fix race on WithFields" This reverts commit dd9fe116838bbbb5f60a2cc0921f1f5c7d071ce1. --- heartbeat/monitors/wrappers/util.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/heartbeat/monitors/wrappers/util.go b/heartbeat/monitors/wrappers/util.go index 34e4fa53fde..83778069d99 100644 --- a/heartbeat/monitors/wrappers/util.go +++ b/heartbeat/monitors/wrappers/util.go @@ -33,14 +33,12 @@ func WithFields(fields common.MapStr, origJob jobs.Job) jobs.Job { return jobs.Wrap(origJob, Fields(fields)) } -// Fields is a JobWrapper that adds fields to a given event. +// Fields is a JobWrapper that adds fields to a given event func Fields(fields common.MapStr) jobs.JobWrapper { return func(origJob jobs.Job) jobs.Job { return func(event *beat.Event) ([]jobs.Job, error) { cont, err := origJob(event) - // Clone is necessary below for threadsafety since this may be invoked - // across continuations. - eventext.MergeEventFields(event, fields.Clone()) + eventext.MergeEventFields(event, fields) return cont, err } } From fd4bf2a6b6c00240b298fb89f4a34efa38ccc216 Mon Sep 17 00:00:00 2001 From: Andrew Cholakian Date: Thu, 25 Apr 2019 21:52:01 -0500 Subject: [PATCH 10/14] Revert heartbeat.yml --- heartbeat/heartbeat.yml | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/heartbeat/heartbeat.yml b/heartbeat/heartbeat.yml index 44bc41c7c36..cfeaf7485e2 100644 --- a/heartbeat/heartbeat.yml +++ b/heartbeat/heartbeat.yml @@ -22,10 +22,16 @@ heartbeat.config.monitors: # Configure monitors inline heartbeat.monitors: - type: http - urls: ["http://elastic.co:9200"] - mode: all + + # List or urls to query + urls: ["http://localhost:9200"] + + # Configure task schedule schedule: '@every 10s' + # Total test connection and data exchange timeout + #timeout: 16s + #==================== Elasticsearch template setting ========================== setup.template.settings: From dad49b907c28f679a770f2c6a5cfd40fe3f11bb2 Mon Sep 17 00:00:00 2001 From: Andrew Cholakian Date: Thu, 25 Apr 2019 21:54:09 -0500 Subject: [PATCH 11/14] Fix comment --- heartbeat/eventext/eventext.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/heartbeat/eventext/eventext.go b/heartbeat/eventext/eventext.go index 6435656c900..675fe191c91 100644 --- a/heartbeat/eventext/eventext.go +++ b/heartbeat/eventext/eventext.go @@ -31,7 +31,7 @@ func MergeEventFields(e *beat.Event, merge common.MapStr) { } } -// EventCalledMetaKey is the path to the @metadata key marking an event as cancelled. +// EventCancelledMetaKey is the path to the @metadata key marking an event as cancelled. const EventCancelledMetaKey = "__hb_evt_cancel__" // CancelEvent marks the event as cancelled. Downstream consumers of it should not emit nor output this event. From 88a6f78d2e1855974cac24cbd3597ebfcddcae4a Mon Sep 17 00:00:00 2001 From: Andrew Cholakian Date: Mon, 29 Apr 2019 10:36:13 -0500 Subject: [PATCH 12/14] Fix ordering of error check --- heartbeat/monitors/task.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/heartbeat/monitors/task.go b/heartbeat/monitors/task.go index 4ea3d891bb0..09df1173999 100644 --- a/heartbeat/monitors/task.go +++ b/heartbeat/monitors/task.go @@ -138,13 +138,14 @@ func runPublishJob(job jobs.Job, client beat.Client) []scheduler.TaskFunc { event := &beat.Event{ Fields: common.MapStr{}, } - next, err := job(event) - hasContinuations := len(next) > 0 + next, err := job(event) if err != nil { logp.Err("Job %v failed with: ", err) } + hasContinuations := len(next) > 0 + if event.Fields != nil && !eventext.IsEventCancelled(event) { // If continuations are present we defensively publish a clone of the event // in the chance that the event shares underlying data with the events for continuations From b8606768edfb223941c736e81a277e6f4078a0a7 Mon Sep 17 00:00:00 2001 From: Andrew Cholakian Date: Mon, 29 Apr 2019 11:13:43 -0500 Subject: [PATCH 13/14] Fix import sorting --- heartbeat/monitors/task.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/heartbeat/monitors/task.go b/heartbeat/monitors/task.go index 09df1173999..6ea1257339b 100644 --- a/heartbeat/monitors/task.go +++ b/heartbeat/monitors/task.go @@ -20,10 +20,9 @@ package monitors import ( "fmt" - "github.com/elastic/beats/heartbeat/eventext" - "github.com/pkg/errors" + "github.com/elastic/beats/heartbeat/eventext" "github.com/elastic/beats/heartbeat/monitors/jobs" "github.com/elastic/beats/heartbeat/scheduler" "github.com/elastic/beats/heartbeat/scheduler/schedule" From 61cdef97523017c9b642616c6926d070915dfdc8 Mon Sep 17 00:00:00 2001 From: Andrew Cholakian Date: Tue, 30 Apr 2019 10:08:26 -0500 Subject: [PATCH 14/14] Test metadata, not fields for evt cancelled info --- heartbeat/monitors/wrappers/monitors_test.go | 27 +++++++++++++++----- 1 file changed, 21 insertions(+), 6 deletions(-) diff --git a/heartbeat/monitors/wrappers/monitors_test.go b/heartbeat/monitors/wrappers/monitors_test.go index db23fff683f..b56c83a95d0 100644 --- a/heartbeat/monitors/wrappers/monitors_test.go +++ b/heartbeat/monitors/wrappers/monitors_test.go @@ -39,10 +39,11 @@ type fields struct { } type testDef struct { - name string - fields fields - jobs []jobs.Job - want []mapval.Validator + name string + fields fields + jobs []jobs.Job + want []mapval.Validator + metaWant []mapval.Validator } func testCommonWrap(t *testing.T, tt testDef) { @@ -57,6 +58,11 @@ func testCommonWrap(t *testing.T, tt testDef) { t.Run(fmt.Sprintf("result at index %d", idx), func(t *testing.T) { want := tt.want[idx] mapval.Test(t, mapval.Strict(want), r.Fields) + + if tt.metaWant != nil { + metaWant := tt.metaWant[idx] + mapval.Test(t, mapval.Strict(metaWant), r.Meta) + } }) } }) @@ -83,6 +89,7 @@ func TestSimpleJob(t *testing.T) { }), summaryValidator(1, 0), )}, + nil, }) } @@ -116,6 +123,7 @@ func TestErrorJob(t *testing.T) { errorJobValidator, summaryValidator(0, 1), )}, + nil, }) } @@ -146,6 +154,7 @@ func TestMultiJobNoConts(t *testing.T) { fields, []jobs.Job{makeURLJob(t, "http://foo.com"), makeURLJob(t, "http://bar.com")}, []mapval.Validator{validator("http://foo.com"), validator("http://bar.com")}, + nil, }) } @@ -203,6 +212,7 @@ func TestMultiJobConts(t *testing.T) { summaryValidator(2, 0), ), }, + nil, }) } @@ -245,6 +255,7 @@ func TestMultiJobContsCancelledEvents(t *testing.T) { ) } + metaCancelledValidator := mapval.MustCompile(mapval.Map{eventext.EventCancelledMetaKey: true}) testCommonWrap(t, testDef{ "multi-job-continuations", fields, @@ -252,7 +263,6 @@ func TestMultiJobContsCancelledEvents(t *testing.T) { []mapval.Validator{ mapval.Compose( contJobValidator("http://foo.com", "1st"), - mapval.MustCompile(mapval.Map{eventext.EventCancelledMetaKey: true}), ), mapval.Compose( contJobValidator("http://foo.com", "2nd"), @@ -260,13 +270,18 @@ func TestMultiJobContsCancelledEvents(t *testing.T) { ), mapval.Compose( contJobValidator("http://bar.com", "1st"), - mapval.MustCompile(mapval.Map{eventext.EventCancelledMetaKey: true}), ), mapval.Compose( contJobValidator("http://bar.com", "2nd"), summaryValidator(1, 0), ), }, + []mapval.Validator{ + metaCancelledValidator, + mapval.MustCompile(mapval.Map{}), + metaCancelledValidator, + mapval.MustCompile(mapval.Map{}), + }, }) }