diff --git a/heartbeat/eventext/eventext.go b/heartbeat/eventext/eventext.go index daf25ab9602a..675fe191c910 100644 --- a/heartbeat/eventext/eventext.go +++ b/heartbeat/eventext/eventext.go @@ -25,8 +25,30 @@ import ( // MergeEventFields merges the given common.MapStr into the given Event's Fields. func MergeEventFields(e *beat.Event, merge common.MapStr) { if e.Fields != nil { - e.Fields.DeepUpdate(merge) + e.Fields.DeepUpdate(merge.Clone()) } else { - e.Fields = merge + e.Fields = merge.Clone() } } + +// EventCancelledMetaKey is the path to the @metadata key marking an event as cancelled. +const EventCancelledMetaKey = "__hb_evt_cancel__" + +// CancelEvent marks the event as cancelled. Downstream consumers of it should not emit nor output this event. +func CancelEvent(event *beat.Event) { + if event != nil { + if event.Meta == nil { + event.Meta = common.MapStr{} + } + event.Meta.Put(EventCancelledMetaKey, true) + } +} + +// IsEventCancelled checks for the marker left by CancelEvent. +func IsEventCancelled(event *beat.Event) bool { + if event == nil || event.Meta == nil { + return false + } + v, err := event.Meta.GetValue(EventCancelledMetaKey) + return err == nil && v == true +} diff --git a/heartbeat/monitors/task.go b/heartbeat/monitors/task.go index 43e4a8056fa1..6ea1257339ba 100644 --- a/heartbeat/monitors/task.go +++ b/heartbeat/monitors/task.go @@ -22,6 +22,7 @@ import ( "github.com/pkg/errors" + "github.com/elastic/beats/heartbeat/eventext" "github.com/elastic/beats/heartbeat/monitors/jobs" "github.com/elastic/beats/heartbeat/scheduler" "github.com/elastic/beats/heartbeat/scheduler/schedule" @@ -86,42 +87,7 @@ func (e ProcessorsError) Error() string { func (t *configuredJob) prepareSchedulerJob(job jobs.Job) scheduler.TaskFunc { return func() []scheduler.TaskFunc { - event := &beat.Event{ - Fields: common.MapStr{}, - } - next, err := job(event) - hasContinuations := len(next) > 0 - - if err != nil { - logp.Err("Job %v failed with: ", err) - } - - if event != nil && event.Fields != nil { - // If continuations are present we defensively publish a clone of the event - // in the chance that the event shares underlying data with the events for continuations - // This prevents races where the pipeline publish could accidentally alter multiple events. - if hasContinuations { - clone := beat.Event{ - Timestamp: event.Timestamp, - Meta: event.Meta.Clone(), - Fields: event.Fields.Clone(), - } - t.client.Publish(clone) - } else { - // no clone needed if no continuations - t.client.Publish(*event) - } - } - - if len(next) == 0 { - return nil - } - - continuations := make([]scheduler.TaskFunc, len(next)) - for i, n := range next { - continuations[i] = t.prepareSchedulerJob(n) - } - return continuations + return runPublishJob(job, t.client) } } @@ -166,3 +132,45 @@ func (t *configuredJob) Stop() { t.client.Close() } } + +func runPublishJob(job jobs.Job, client beat.Client) []scheduler.TaskFunc { + event := &beat.Event{ + Fields: common.MapStr{}, + } + + next, err := job(event) + if err != nil { + logp.Err("Job %v failed with: ", err) + } + + hasContinuations := len(next) > 0 + + if event.Fields != nil && !eventext.IsEventCancelled(event) { + // If continuations are present we defensively publish a clone of the event + // in the chance that the event shares underlying data with the events for continuations + // This prevents races where the pipeline publish could accidentally alter multiple events. + if hasContinuations { + clone := beat.Event{ + Timestamp: event.Timestamp, + Meta: event.Meta.Clone(), + Fields: event.Fields.Clone(), + } + client.Publish(clone) + } else { + // no clone needed if no continuations + client.Publish(*event) + } + } + + if len(next) == 0 { + return nil + } + + continuations := make([]scheduler.TaskFunc, len(next)) + for i, n := range next { + continuations[i] = func() []scheduler.TaskFunc { + return runPublishJob(n, client) + } + } + return continuations +} diff --git a/heartbeat/monitors/task_test.go b/heartbeat/monitors/task_test.go new file mode 100644 index 000000000000..56e832dc4a3d --- /dev/null +++ b/heartbeat/monitors/task_test.go @@ -0,0 +1,96 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package monitors + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/elastic/beats/heartbeat/eventext" + "github.com/elastic/beats/heartbeat/monitors/jobs" + "github.com/elastic/beats/libbeat/beat" + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/common/mapval" +) + +func Test_runPublishJob(t *testing.T) { + simpleJob := func(event *beat.Event) (j []jobs.Job, e error) { + eventext.MergeEventFields(event, common.MapStr{"foo": "bar"}) + return nil, nil + } + + testCases := []struct { + name string + job jobs.Job + validators []mapval.Validator + }{ + { + "simple", + simpleJob, + []mapval.Validator{ + mapval.MustCompile(mapval.Map{"foo": "bar"}), + }, + }, + { + "one cont", + func(event *beat.Event) (j []jobs.Job, e error) { + simpleJob(event) + return []jobs.Job{simpleJob}, nil + }, + []mapval.Validator{ + mapval.MustCompile(mapval.Map{"foo": "bar"}), + mapval.MustCompile(mapval.Map{"foo": "bar"}), + }, + }, + { + "cancelled cont", + func(event *beat.Event) (j []jobs.Job, e error) { + eventext.CancelEvent(event) + return []jobs.Job{simpleJob}, nil + }, + []mapval.Validator{ + mapval.MustCompile(mapval.Map{"foo": "bar"}), + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + client := &MockBeatClient{} + queue := runPublishJob(tc.job, client) + for { + if len(queue) == 0 { + break + } + tf := queue[0] + queue = queue[1:] + conts := tf() + for _, cont := range conts { + queue = append(queue, cont) + } + } + client.Close() + + require.Len(t, client.publishes, len(tc.validators)) + for idx, event := range client.publishes { + mapval.Test(t, tc.validators[idx], event.Fields) + } + }) + } +} diff --git a/heartbeat/monitors/util.go b/heartbeat/monitors/util.go index fa48113ec72a..df32a928aef9 100644 --- a/heartbeat/monitors/util.go +++ b/heartbeat/monitors/util.go @@ -23,6 +23,7 @@ import ( "net" "time" + "github.com/elastic/beats/heartbeat/eventext" "github.com/elastic/beats/heartbeat/look" "github.com/elastic/beats/heartbeat/monitors/jobs" "github.com/elastic/beats/heartbeat/monitors/wrappers" @@ -211,7 +212,11 @@ func makeByHostAllIPJob( ipFields := resolveIPEvent(ip.String(), resolveRTT) cont[i] = wrappers.WithFields(ipFields, pingFactory(addr)) } - return cont, nil + // Ideally we would test this invocation. This function however is really hard to to test given all the extra context it takes in + // In a future refactor we could perhaps test that this in correctly invoked. + eventext.CancelEvent(event) + + return cont, err } } diff --git a/heartbeat/monitors/wrappers/monitors.go b/heartbeat/monitors/wrappers/monitors.go index ad05d22c5e98..4de602505820 100644 --- a/heartbeat/monitors/wrappers/monitors.go +++ b/heartbeat/monitors/wrappers/monitors.go @@ -155,13 +155,18 @@ func makeAddSummary() jobs.JobWrapper { state.mtx.Lock() defer state.mtx.Unlock() - // After each job - eventStatus, _ := event.GetValue("monitor.status") - if eventStatus == "up" { - state.up++ - } else { - state.down++ + // If the event is cancelled we don't record it as being either up or down since + // we discard the event anyway. + if !eventext.IsEventCancelled(event) { + // After each job + eventStatus, _ := event.GetValue("monitor.status") + if eventStatus == "up" { + state.up++ + } else { + state.down++ + } } + // No error check needed here event.PutValue("monitor.check_group", state.checkGroup) diff --git a/heartbeat/monitors/wrappers/monitors_test.go b/heartbeat/monitors/wrappers/monitors_test.go index 00755b4c6e22..b56c83a95d05 100644 --- a/heartbeat/monitors/wrappers/monitors_test.go +++ b/heartbeat/monitors/wrappers/monitors_test.go @@ -39,10 +39,11 @@ type fields struct { } type testDef struct { - name string - fields fields - jobs []jobs.Job - want []mapval.Validator + name string + fields fields + jobs []jobs.Job + want []mapval.Validator + metaWant []mapval.Validator } func testCommonWrap(t *testing.T, tt testDef) { @@ -52,9 +53,16 @@ func testCommonWrap(t *testing.T, tt testDef) { results, err := jobs.ExecJobsAndConts(t, wrapped) assert.NoError(t, err) + require.Equal(t, len(results), len(tt.want), "Expected test def wants to correspond exactly to number results.") for idx, r := range results { t.Run(fmt.Sprintf("result at index %d", idx), func(t *testing.T) { - mapval.Test(t, mapval.Strict(tt.want[idx]), r.Fields) + want := tt.want[idx] + mapval.Test(t, mapval.Strict(want), r.Fields) + + if tt.metaWant != nil { + metaWant := tt.metaWant[idx] + mapval.Test(t, mapval.Strict(metaWant), r.Meta) + } }) } }) @@ -81,6 +89,7 @@ func TestSimpleJob(t *testing.T) { }), summaryValidator(1, 0), )}, + nil, }) } @@ -114,6 +123,7 @@ func TestErrorJob(t *testing.T) { errorJobValidator, summaryValidator(0, 1), )}, + nil, }) } @@ -144,6 +154,7 @@ func TestMultiJobNoConts(t *testing.T) { fields, []jobs.Job{makeURLJob(t, "http://foo.com"), makeURLJob(t, "http://bar.com")}, []mapval.Validator{validator("http://foo.com"), validator("http://bar.com")}, + nil, }) } @@ -201,6 +212,76 @@ func TestMultiJobConts(t *testing.T) { summaryValidator(2, 0), ), }, + nil, + }) +} + +func TestMultiJobContsCancelledEvents(t *testing.T) { + fields := fields{"myid", "myname", "mytyp"} + + uniqScope := mapval.ScopedIsUnique() + + makeContJob := func(t *testing.T, u string) jobs.Job { + return func(event *beat.Event) ([]jobs.Job, error) { + eventext.MergeEventFields(event, common.MapStr{"cont": "1st"}) + eventext.CancelEvent(event) + u, err := url.Parse(u) + require.NoError(t, err) + eventext.MergeEventFields(event, common.MapStr{"url": URLFields(u)}) + return []jobs.Job{ + func(event *beat.Event) ([]jobs.Job, error) { + eventext.MergeEventFields(event, common.MapStr{"cont": "2nd"}) + eventext.MergeEventFields(event, common.MapStr{"url": URLFields(u)}) + return nil, nil + }, + }, nil + } + } + + contJobValidator := func(u string, msg string) mapval.Validator { + return mapval.Compose( + urlValidator(t, u), + mapval.MustCompile(mapval.Map{"cont": msg}), + mapval.MustCompile(mapval.Map{ + "monitor": mapval.Map{ + "duration.us": mapval.IsDuration, + "id": uniqScope.IsUniqueTo(u), + "name": fields.name, + "type": fields.typ, + "status": "up", + "check_group": uniqScope.IsUniqueTo(u), + }, + }), + ) + } + + metaCancelledValidator := mapval.MustCompile(mapval.Map{eventext.EventCancelledMetaKey: true}) + testCommonWrap(t, testDef{ + "multi-job-continuations", + fields, + []jobs.Job{makeContJob(t, "http://foo.com"), makeContJob(t, "http://bar.com")}, + []mapval.Validator{ + mapval.Compose( + contJobValidator("http://foo.com", "1st"), + ), + mapval.Compose( + contJobValidator("http://foo.com", "2nd"), + summaryValidator(1, 0), + ), + mapval.Compose( + contJobValidator("http://bar.com", "1st"), + ), + mapval.Compose( + contJobValidator("http://bar.com", "2nd"), + summaryValidator(1, 0), + ), + }, + []mapval.Validator{ + metaCancelledValidator, + mapval.MustCompile(mapval.Map{}), + metaCancelledValidator, + mapval.MustCompile(mapval.Map{}), + }, }) }