Skip to content

Commit

Permalink
[Heartbeat] Add monitor.timespan field (elastic#14778)
Browse files Browse the repository at this point in the history
This field is needed to optimize the overview page queries. By adding the time range this monitor is responsible for we can use a range query to substantially narrow the candidate document set in queries.

This takes over from the experiments in elastic#13672 and should be production quality. It only adds this single necessary field.
  • Loading branch information
andrewvc authored Dec 16, 2019
1 parent ac9434b commit 416aab0
Show file tree
Hide file tree
Showing 17 changed files with 222 additions and 51 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d

- Enable `add_observer_metadata` processor in default config. {pull}11394[11394]
- Record HTTP body metadata and optionally contents in `http.response.body.*` fields. {pull}13022[13022]
- Add `monitor.timespan` field for optimized queries in kibana. {pull}13672[13672]
- Allow `hosts` to be used to configure http monitors {pull}13703[13703]
- google-pubsub input: ACK pub/sub message when acknowledged by publisher. {issue}13346[13346] {pull}14715[14715]
- Remove Beta label from google-pubsub input. {issue}13346[13346] {pull}14715[14715]
Expand Down
5 changes: 5 additions & 0 deletions heartbeat/_meta/fields.common.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@
description: >
A token unique to a simultaneously invoked group of checks as in the case where multiple IPs are checked for a single DNS entry.
- name: timespan
type: date_range
description: >
Time range this ping reported starting at the instant the check was started, ending at the start of the next scheduled check.
- key: summary
title: "Monitor summary"
description:
Expand Down
10 changes: 10 additions & 0 deletions heartbeat/docs/fields.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,16 @@ type: keyword
--
*`monitor.timespan`*::
+
--
Time range this ping reported starting at the instant the check was started, ending at the start of the next scheduled check.
type: date_range
--
[[exported-fields-docker-processor]]
== Docker fields
Expand Down
28 changes: 17 additions & 11 deletions heartbeat/hbtest/hbtestutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import (
"strings"
"testing"

"github.com/elastic/beats/heartbeat/hbtestllext"

"github.com/stretchr/testify/require"

"github.com/elastic/beats/heartbeat/monitors/wrappers"
Expand Down Expand Up @@ -124,17 +126,21 @@ func BaseChecks(ip string, status string, typ string) validator.Validator {
} else {
ipCheck = isdef.Optional(isdef.IsEqual(ip))
}
return lookslike.MustCompile(map[string]interface{}{
"monitor": map[string]interface{}{
"ip": ipCheck,
"duration.us": isdef.IsDuration,
"status": status,
"id": isdef.IsNonEmptyString,
"name": isdef.IsString,
"type": typ,
"check_group": isdef.IsString,
},
})

return lookslike.Compose(
lookslike.MustCompile(map[string]interface{}{
"monitor": map[string]interface{}{
"ip": ipCheck,
"status": status,
"duration.us": isdef.IsDuration,
"id": isdef.IsNonEmptyString,
"name": isdef.IsString,
"type": typ,
"check_group": isdef.IsString,
},
}),
hbtestllext.MonitorTimespanValidator,
)
}

// SummaryChecks validates the "summary" field and its subfields.
Expand Down
35 changes: 35 additions & 0 deletions heartbeat/hbtestllext/isdefs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package hbtestllext

import (
"time"

"github.com/elastic/go-lookslike/isdef"
"github.com/elastic/go-lookslike/llpath"
"github.com/elastic/go-lookslike/llresult"
)

// IsTime checks that the value is a time.Time instance.
var IsTime = isdef.Is("time", func(path llpath.Path, v interface{}) *llresult.Results {
_, ok := v.(time.Time)
if !ok {
return llresult.SimpleResult(path, false, "expected a time.Time")
}
return llresult.ValidResult(path)
})
32 changes: 32 additions & 0 deletions heartbeat/hbtestllext/validators.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package hbtestllext

import (
"github.com/elastic/go-lookslike"
)

// MonitorTimespanValidator is tests for the `next_run` and `next_run_in.us` keys.
var MonitorTimespanValidator = lookslike.MustCompile(map[string]interface{}{
"monitor": map[string]interface{}{
"timespan": map[string]interface{}{
"gte": IsTime,
"lt": IsTime,
},
},
})
2 changes: 1 addition & 1 deletion heartbeat/include/fields.go

Large diffs are not rendered by default.

10 changes: 7 additions & 3 deletions heartbeat/monitors/active/http/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (

"github.com/elastic/beats/heartbeat/hbtest"
"github.com/elastic/beats/heartbeat/monitors/wrappers"
schedule "github.com/elastic/beats/heartbeat/scheduler/schedule"
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/file"
Expand Down Expand Up @@ -74,7 +75,8 @@ func testTLSRequest(t *testing.T, testURL string, useUrls bool, extraConfig map[
jobs, endpoints, err := create("tls", config)
require.NoError(t, err)

job := wrappers.WrapCommon(jobs, "tls", "", "http")[0]
sched, _ := schedule.Parse("@every 1s")
job := wrappers.WrapCommon(jobs, "tls", "", "http", sched, time.Duration(0))[0]

event := &beat.Event{}
_, err = job(event)
Expand Down Expand Up @@ -301,7 +303,8 @@ func TestLargeResponse(t *testing.T) {
jobs, _, err := create("largeresp", config)
require.NoError(t, err)

job := wrappers.WrapCommon(jobs, "test", "", "http")[0]
sched, _ := schedule.Parse("@every 1s")
job := wrappers.WrapCommon(jobs, "test", "", "http", sched, time.Duration(0))[0]

event := &beat.Event{}
_, err = job(event)
Expand Down Expand Up @@ -472,7 +475,8 @@ func TestRedirect(t *testing.T) {
jobs, _, err := create("redirect", config)
require.NoError(t, err)

job := wrappers.WrapCommon(jobs, "test", "", "http")[0]
sched, _ := schedule.Parse("@every 1s")
job := wrappers.WrapCommon(jobs, "test", "", "http", sched, time.Duration(0))[0]

event := &beat.Event{}
_, err = job(event)
Expand Down
16 changes: 9 additions & 7 deletions heartbeat/monitors/active/tcp/tcp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,19 @@ import (
"os"
"strconv"
"testing"

"github.com/elastic/go-lookslike/isdef"
"time"

"github.com/stretchr/testify/require"

"github.com/elastic/go-lookslike"
"github.com/elastic/go-lookslike/testslike"

"github.com/elastic/beats/heartbeat/hbtest"
"github.com/elastic/beats/heartbeat/monitors/wrappers"
"github.com/elastic/beats/heartbeat/scheduler/schedule"
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
btesting "github.com/elastic/beats/libbeat/testing"
"github.com/elastic/go-lookslike"
"github.com/elastic/go-lookslike/isdef"
"github.com/elastic/go-lookslike/testslike"
)

func testTCPCheck(t *testing.T, host string, port uint16) *beat.Event {
Expand All @@ -58,7 +58,8 @@ func testTCPConfigCheck(t *testing.T, configMap common.MapStr, host string, port
jobs, endpoints, err := create("tcp", config)
require.NoError(t, err)

job := wrappers.WrapCommon(jobs, "test", "", "tcp")[0]
sched, _ := schedule.Parse("@every 1s")
job := wrappers.WrapCommon(jobs, "test", "", "tcp", sched, time.Duration(0))[0]

event := &beat.Event{}
_, err = job(event)
Expand All @@ -81,7 +82,8 @@ func testTLSTCPCheck(t *testing.T, host string, port uint16, certFileName string
jobs, endpoints, err := create("tcp", config)
require.NoError(t, err)

job := wrappers.WrapCommon(jobs, "test", "", "tcp")[0]
sched, _ := schedule.Parse("@every 1s")
job := wrappers.WrapCommon(jobs, "test", "", "tcp", sched, time.Duration(0))[0]

event := &beat.Event{}
_, err = job(event)
Expand Down
10 changes: 5 additions & 5 deletions heartbeat/monitors/mocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,18 @@ import (
"sync"
"testing"

"github.com/elastic/go-lookslike/isdef"
"github.com/elastic/go-lookslike/validator"

"github.com/stretchr/testify/require"

"github.com/elastic/go-lookslike"

"github.com/elastic/beats/heartbeat/eventext"
"github.com/elastic/beats/heartbeat/hbtest"
"github.com/elastic/beats/heartbeat/hbtestllext"
"github.com/elastic/beats/heartbeat/monitors/jobs"
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/monitoring"
"github.com/elastic/go-lookslike"
"github.com/elastic/go-lookslike/isdef"
"github.com/elastic/go-lookslike/validator"
)

type MockBeatClient struct {
Expand Down Expand Up @@ -116,6 +115,7 @@ func mockEventMonitorValidator(id string) validator.Validator {
"check_group": isdef.IsString,
},
}),
hbtestllext.MonitorTimespanValidator,
hbtest.SummaryChecks(1, 0),
lookslike.MustCompile(mockEventCustomFields()),
))
Expand Down
2 changes: 1 addition & 1 deletion heartbeat/monitors/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func newMonitorUnsafe(
}

rawJobs, endpoints, err := monitorPlugin.create(config)
wrappedJobs := wrappers.WrapCommon(rawJobs, m.id, m.name, m.typ)
wrappedJobs := wrappers.WrapCommon(rawJobs, m.id, m.name, m.typ, mpi.Schedule, mpi.Timeout)
m.endpoints = endpoints

if err != nil {
Expand Down
13 changes: 9 additions & 4 deletions heartbeat/monitors/pluginconf.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@
package monitors

import (
"time"

"github.com/pkg/errors"

"github.com/elastic/beats/heartbeat/scheduler/schedule"
"github.com/elastic/beats/libbeat/common"
)

Expand All @@ -28,10 +31,12 @@ var ErrPluginDisabled = errors.New("Monitor not loaded, plugin is disabled")

// MonitorPluginInfo represents the generic configuration options around a monitor plugin.
type MonitorPluginInfo struct {
ID string `config:"id"`
Name string `config:"name"`
Type string `config:"type" validate:"required"`
Enabled bool `config:"enabled"`
ID string `config:"id"`
Name string `config:"name"`
Type string `config:"type" validate:"required"`
Schedule *schedule.Schedule `config:"schedule" validate:"required"`
Timeout time.Duration `config:"timeout"`
Enabled bool `config:"enabled"`
}

func pluginInfo(config *common.Config) (MonitorPluginInfo, error) {
Expand Down
29 changes: 23 additions & 6 deletions heartbeat/monitors/wrappers/monitors.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"sync"
"time"

"github.com/elastic/beats/heartbeat/scheduler/schedule"

"github.com/gofrs/uuid"
"github.com/mitchellh/hashstructure"
"github.com/pkg/errors"
Expand All @@ -35,23 +37,24 @@ import (
)

// WrapCommon applies the common wrappers that all monitor jobs get.
func WrapCommon(js []jobs.Job, id string, name string, typ string) []jobs.Job {
func WrapCommon(js []jobs.Job, id string, name string, typ string, sched *schedule.Schedule, timeout time.Duration) []jobs.Job {
return jobs.WrapAllSeparately(
jobs.WrapAll(
js,
addMonitorStatus,
addMonitorDuration,
), func() jobs.JobWrapper {
return addMonitorMeta(id, name, typ, len(js) > 1)
return addMonitorMeta(id, name, typ, len(js) > 1, sched, timeout)
}, func() jobs.JobWrapper {
return makeAddSummary()
})
}

// addMonitorMeta adds the id, name, and type fields to the monitor.
func addMonitorMeta(id string, name string, typ string, isMulti bool) jobs.JobWrapper {
func addMonitorMeta(id string, name string, typ string, isMulti bool, sched *schedule.Schedule, timeout time.Duration) jobs.JobWrapper {
return func(job jobs.Job) jobs.Job {
return func(event *beat.Event) ([]jobs.Job, error) {
started := time.Now()
cont, e := job(event)
thisID := id

Expand All @@ -69,9 +72,10 @@ func addMonitorMeta(id string, name string, typ string, isMulti bool) jobs.JobWr
event,
common.MapStr{
"monitor": common.MapStr{
"id": thisID,
"name": name,
"type": typ,
"id": thisID,
"name": name,
"type": typ,
"timespan": timespan(started, sched, timeout),
},
},
)
Expand All @@ -81,6 +85,19 @@ func addMonitorMeta(id string, name string, typ string, isMulti bool) jobs.JobWr
}
}

func timespan(started time.Time, sched *schedule.Schedule, timeout time.Duration) common.MapStr {
maxEnd := sched.Next(started)

if maxEnd.Sub(started) < timeout {
maxEnd = started.Add(timeout)
}

return common.MapStr{
"gte": started,
"lt": maxEnd,
}
}

// addMonitorStatus wraps the given Job's execution such that any error returned
// by the original Job will be set as a field. The original error will not be
// passed through as a return value. Errors may still be present but only if there
Expand Down
Loading

0 comments on commit 416aab0

Please sign in to comment.