Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Heartbeat] Add monitor.timespan field #14778

Merged
merged 21 commits into from
Dec 16, 2019
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d

- Enable `add_observer_metadata` processor in default config. {pull}11394[11394]
- Record HTTP body metadata and optionally contents in `http.response.body.*` fields. {pull}13022[13022]
- Add `monitor.timespan` field for optimized queries in kibana. {pull}13672[13672]
- Allow `hosts` to be used to configure http monitors {pull}13703[13703]

*Journalbeat*
Expand Down
5 changes: 5 additions & 0 deletions heartbeat/_meta/fields.common.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@
description: >
A token unique to a simultaneously invoked group of checks as in the case where multiple IPs are checked for a single DNS entry.

- name: timespan
type: date_range
description: >
Time range this ping reported starting at the instant the check was started, ending at the start of the next scheduled check.

- key: summary
title: "Monitor summary"
description:
Expand Down
10 changes: 10 additions & 0 deletions heartbeat/docs/fields.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,16 @@ type: keyword

--

*`monitor.timespan`*::
+
--
Time range this ping reported starting at the instant the check was started, ending at the start of the next scheduled check.


type: date_range

--

[[exported-fields-docker-processor]]
== Docker fields

Expand Down
28 changes: 17 additions & 11 deletions heartbeat/hbtest/hbtestutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import (
"strings"
"testing"

"github.com/elastic/beats/heartbeat/hbtestllext"

"github.com/stretchr/testify/require"

"github.com/elastic/beats/heartbeat/monitors/wrappers"
Expand Down Expand Up @@ -124,17 +126,21 @@ func BaseChecks(ip string, status string, typ string) validator.Validator {
} else {
ipCheck = isdef.Optional(isdef.IsEqual(ip))
}
return lookslike.MustCompile(map[string]interface{}{
"monitor": map[string]interface{}{
"ip": ipCheck,
"duration.us": isdef.IsDuration,
"status": status,
"id": isdef.IsNonEmptyString,
"name": isdef.IsString,
"type": typ,
"check_group": isdef.IsString,
},
})

return lookslike.Compose(
lookslike.MustCompile(map[string]interface{}{
"monitor": map[string]interface{}{
"ip": ipCheck,
"status": status,
"duration.us": isdef.IsDuration,
"id": isdef.IsNonEmptyString,
"name": isdef.IsString,
"type": typ,
"check_group": isdef.IsString,
},
}),
hbtestllext.MonitorTimespanValidator,
)
}

// SummaryChecks validates the "summary" field and its subfields.
Expand Down
35 changes: 35 additions & 0 deletions heartbeat/hbtestllext/isdefs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package hbtestllext

import (
"time"

"github.com/elastic/go-lookslike/isdef"
"github.com/elastic/go-lookslike/llpath"
"github.com/elastic/go-lookslike/llresult"
)

// IsTime checks that the value is a time.Time instance.
var IsTime = isdef.Is("time", func(path llpath.Path, v interface{}) *llresult.Results {
_, ok := v.(time.Time)
if !ok {
return llresult.SimpleResult(path, false, "expected a time.Time")
}
return llresult.ValidResult(path)
})
32 changes: 32 additions & 0 deletions heartbeat/hbtestllext/validators.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package hbtestllext

import (
"github.com/elastic/go-lookslike"
)

// MonitorTimespanValidator is tests for the `next_run` and `next_run_in.us` keys.
var MonitorTimespanValidator = lookslike.MustCompile(map[string]interface{}{
"monitor": map[string]interface{}{
"timespan": map[string]interface{}{
"gte": IsTime,
"lt": IsTime,
},
},
})
2 changes: 1 addition & 1 deletion heartbeat/include/fields.go

Large diffs are not rendered by default.

10 changes: 7 additions & 3 deletions heartbeat/monitors/active/http/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (

"github.com/elastic/beats/heartbeat/hbtest"
"github.com/elastic/beats/heartbeat/monitors/wrappers"
schedule "github.com/elastic/beats/heartbeat/scheduler/schedule"
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/file"
Expand Down Expand Up @@ -74,7 +75,8 @@ func testTLSRequest(t *testing.T, testURL string, useUrls bool, extraConfig map[
jobs, endpoints, err := create("tls", config)
require.NoError(t, err)

job := wrappers.WrapCommon(jobs, "tls", "", "http")[0]
sched, _ := schedule.Parse("@every 1s")
job := wrappers.WrapCommon(jobs, "tls", "", "http", sched, time.Duration(0))[0]

event := &beat.Event{}
_, err = job(event)
Expand Down Expand Up @@ -301,7 +303,8 @@ func TestLargeResponse(t *testing.T) {
jobs, _, err := create("largeresp", config)
require.NoError(t, err)

job := wrappers.WrapCommon(jobs, "test", "", "http")[0]
sched, _ := schedule.Parse("@every 1s")
job := wrappers.WrapCommon(jobs, "test", "", "http", sched, time.Duration(0))[0]

event := &beat.Event{}
_, err = job(event)
Expand Down Expand Up @@ -472,7 +475,8 @@ func TestRedirect(t *testing.T) {
jobs, _, err := create("redirect", config)
require.NoError(t, err)

job := wrappers.WrapCommon(jobs, "test", "", "http")[0]
sched, _ := schedule.Parse("@every 1s")
job := wrappers.WrapCommon(jobs, "test", "", "http", sched, time.Duration(0))[0]

event := &beat.Event{}
_, err = job(event)
Expand Down
16 changes: 9 additions & 7 deletions heartbeat/monitors/active/tcp/tcp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,19 @@ import (
"os"
"strconv"
"testing"

"github.com/elastic/go-lookslike/isdef"
"time"

"github.com/stretchr/testify/require"

"github.com/elastic/go-lookslike"
"github.com/elastic/go-lookslike/testslike"

"github.com/elastic/beats/heartbeat/hbtest"
"github.com/elastic/beats/heartbeat/monitors/wrappers"
"github.com/elastic/beats/heartbeat/scheduler/schedule"
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
btesting "github.com/elastic/beats/libbeat/testing"
"github.com/elastic/go-lookslike"
"github.com/elastic/go-lookslike/isdef"
"github.com/elastic/go-lookslike/testslike"
)

func testTCPCheck(t *testing.T, host string, port uint16) *beat.Event {
Expand All @@ -58,7 +58,8 @@ func testTCPConfigCheck(t *testing.T, configMap common.MapStr, host string, port
jobs, endpoints, err := create("tcp", config)
require.NoError(t, err)

job := wrappers.WrapCommon(jobs, "test", "", "tcp")[0]
sched, _ := schedule.Parse("@every 1s")
job := wrappers.WrapCommon(jobs, "test", "", "tcp", sched, time.Duration(0))[0]

event := &beat.Event{}
_, err = job(event)
Expand All @@ -81,7 +82,8 @@ func testTLSTCPCheck(t *testing.T, host string, port uint16, certFileName string
jobs, endpoints, err := create("tcp", config)
require.NoError(t, err)

job := wrappers.WrapCommon(jobs, "test", "", "tcp")[0]
sched, _ := schedule.Parse("@every 1s")
job := wrappers.WrapCommon(jobs, "test", "", "tcp", sched, time.Duration(0))[0]

event := &beat.Event{}
_, err = job(event)
Expand Down
10 changes: 5 additions & 5 deletions heartbeat/monitors/mocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,18 @@ import (
"sync"
"testing"

"github.com/elastic/go-lookslike/isdef"
"github.com/elastic/go-lookslike/validator"

"github.com/stretchr/testify/require"

"github.com/elastic/go-lookslike"

"github.com/elastic/beats/heartbeat/eventext"
"github.com/elastic/beats/heartbeat/hbtest"
"github.com/elastic/beats/heartbeat/hbtestllext"
"github.com/elastic/beats/heartbeat/monitors/jobs"
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/monitoring"
"github.com/elastic/go-lookslike"
"github.com/elastic/go-lookslike/isdef"
"github.com/elastic/go-lookslike/validator"
)

type MockBeatClient struct {
Expand Down Expand Up @@ -116,6 +115,7 @@ func mockEventMonitorValidator(id string) validator.Validator {
"check_group": isdef.IsString,
},
}),
hbtestllext.MonitorTimespanValidator,
hbtest.SummaryChecks(1, 0),
lookslike.MustCompile(mockEventCustomFields()),
))
Expand Down
2 changes: 1 addition & 1 deletion heartbeat/monitors/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func newMonitorUnsafe(
}

rawJobs, endpoints, err := monitorPlugin.create(config)
wrappedJobs := wrappers.WrapCommon(rawJobs, m.id, m.name, m.typ)
wrappedJobs := wrappers.WrapCommon(rawJobs, m.id, m.name, m.typ, mpi.Schedule, mpi.Timeout)
m.endpoints = endpoints

if err != nil {
Expand Down
13 changes: 9 additions & 4 deletions heartbeat/monitors/pluginconf.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@
package monitors

import (
"time"

"github.com/pkg/errors"

"github.com/elastic/beats/heartbeat/scheduler/schedule"
"github.com/elastic/beats/libbeat/common"
)

Expand All @@ -28,10 +31,12 @@ var ErrPluginDisabled = errors.New("Monitor not loaded, plugin is disabled")

// MonitorPluginInfo represents the generic configuration options around a monitor plugin.
type MonitorPluginInfo struct {
ID string `config:"id"`
Name string `config:"name"`
Type string `config:"type" validate:"required"`
Enabled bool `config:"enabled"`
ID string `config:"id"`
Name string `config:"name"`
Type string `config:"type" validate:"required"`
Schedule *schedule.Schedule `config:"schedule" validate:"required"`
Timeout time.Duration `config:"timeout"`
Enabled bool `config:"enabled"`
}

func pluginInfo(config *common.Config) (MonitorPluginInfo, error) {
Expand Down
29 changes: 23 additions & 6 deletions heartbeat/monitors/wrappers/monitors.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"sync"
"time"

"github.com/elastic/beats/heartbeat/scheduler/schedule"

"github.com/gofrs/uuid"
"github.com/mitchellh/hashstructure"
"github.com/pkg/errors"
Expand All @@ -35,23 +37,24 @@ import (
)

// WrapCommon applies the common wrappers that all monitor jobs get.
func WrapCommon(js []jobs.Job, id string, name string, typ string) []jobs.Job {
func WrapCommon(js []jobs.Job, id string, name string, typ string, sched *schedule.Schedule, timeout time.Duration) []jobs.Job {
return jobs.WrapAllSeparately(
jobs.WrapAll(
js,
addMonitorStatus,
addMonitorDuration,
), func() jobs.JobWrapper {
return addMonitorMeta(id, name, typ, len(js) > 1)
return addMonitorMeta(id, name, typ, len(js) > 1, sched, timeout)
}, func() jobs.JobWrapper {
return makeAddSummary()
})
}

// addMonitorMeta adds the id, name, and type fields to the monitor.
func addMonitorMeta(id string, name string, typ string, isMulti bool) jobs.JobWrapper {
func addMonitorMeta(id string, name string, typ string, isMulti bool, sched *schedule.Schedule, timeout time.Duration) jobs.JobWrapper {
return func(job jobs.Job) jobs.Job {
return func(event *beat.Event) ([]jobs.Job, error) {
started := time.Now()
cont, e := job(event)
thisID := id

Expand All @@ -69,9 +72,10 @@ func addMonitorMeta(id string, name string, typ string, isMulti bool) jobs.JobWr
event,
common.MapStr{
"monitor": common.MapStr{
"id": thisID,
"name": name,
"type": typ,
"id": thisID,
"name": name,
"type": typ,
"timespan": timespan(started, sched, timeout),
},
},
)
Expand All @@ -81,6 +85,19 @@ func addMonitorMeta(id string, name string, typ string, isMulti bool) jobs.JobWr
}
}

func timespan(started time.Time, sched *schedule.Schedule, timeout time.Duration) common.MapStr {
maxEnd := sched.Next(started)

if maxEnd.Sub(started) < timeout {
maxEnd = started.Add(timeout)
}

return common.MapStr{
"gte": started,
"lt": maxEnd,
}
}

// addMonitorStatus wraps the given Job's execution such that any error returned
// by the original Job will be set as a field. The original error will not be
// passed through as a return value. Errors may still be present but only if there
Expand Down
Loading