diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index a788d5f7fb1..a05591b3da0 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -462,6 +462,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Enable `add_observer_metadata` processor in default config. {pull}11394[11394] - Record HTTP body metadata and optionally contents in `http.response.body.*` fields. {pull}13022[13022] +- Add `monitor.timespan` field for optimized queries in kibana. {pull}13672[13672] - Allow `hosts` to be used to configure http monitors {pull}13703[13703] - google-pubsub input: ACK pub/sub message when acknowledged by publisher. {issue}13346[13346] {pull}14715[14715] - Remove Beta label from google-pubsub input. {issue}13346[13346] {pull}14715[14715] diff --git a/heartbeat/_meta/fields.common.yml b/heartbeat/_meta/fields.common.yml index 7cd6393278b..28a721494f9 100644 --- a/heartbeat/_meta/fields.common.yml +++ b/heartbeat/_meta/fields.common.yml @@ -63,6 +63,11 @@ description: > A token unique to a simultaneously invoked group of checks as in the case where multiple IPs are checked for a single DNS entry. + - name: timespan + type: date_range + description: > + Time range this ping reported starting at the instant the check was started, ending at the start of the next scheduled check. + - key: summary title: "Monitor summary" description: diff --git a/heartbeat/docs/fields.asciidoc b/heartbeat/docs/fields.asciidoc index c02828a41e7..91be9bc8c08 100644 --- a/heartbeat/docs/fields.asciidoc +++ b/heartbeat/docs/fields.asciidoc @@ -296,6 +296,16 @@ type: keyword -- +*`monitor.timespan`*:: ++ +-- +Time range this ping reported starting at the instant the check was started, ending at the start of the next scheduled check. + + +type: date_range + +-- + [[exported-fields-docker-processor]] == Docker fields diff --git a/heartbeat/hbtest/hbtestutil.go b/heartbeat/hbtest/hbtestutil.go index 277da7432ce..05f09d0f1e9 100644 --- a/heartbeat/hbtest/hbtestutil.go +++ b/heartbeat/hbtest/hbtestutil.go @@ -30,6 +30,8 @@ import ( "strings" "testing" + "github.com/elastic/beats/heartbeat/hbtestllext" + "github.com/stretchr/testify/require" "github.com/elastic/beats/heartbeat/monitors/wrappers" @@ -124,17 +126,21 @@ func BaseChecks(ip string, status string, typ string) validator.Validator { } else { ipCheck = isdef.Optional(isdef.IsEqual(ip)) } - return lookslike.MustCompile(map[string]interface{}{ - "monitor": map[string]interface{}{ - "ip": ipCheck, - "duration.us": isdef.IsDuration, - "status": status, - "id": isdef.IsNonEmptyString, - "name": isdef.IsString, - "type": typ, - "check_group": isdef.IsString, - }, - }) + + return lookslike.Compose( + lookslike.MustCompile(map[string]interface{}{ + "monitor": map[string]interface{}{ + "ip": ipCheck, + "status": status, + "duration.us": isdef.IsDuration, + "id": isdef.IsNonEmptyString, + "name": isdef.IsString, + "type": typ, + "check_group": isdef.IsString, + }, + }), + hbtestllext.MonitorTimespanValidator, + ) } // SummaryChecks validates the "summary" field and its subfields. diff --git a/heartbeat/hbtestllext/isdefs.go b/heartbeat/hbtestllext/isdefs.go new file mode 100644 index 00000000000..b913c6c40da --- /dev/null +++ b/heartbeat/hbtestllext/isdefs.go @@ -0,0 +1,35 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package hbtestllext + +import ( + "time" + + "github.com/elastic/go-lookslike/isdef" + "github.com/elastic/go-lookslike/llpath" + "github.com/elastic/go-lookslike/llresult" +) + +// IsTime checks that the value is a time.Time instance. +var IsTime = isdef.Is("time", func(path llpath.Path, v interface{}) *llresult.Results { + _, ok := v.(time.Time) + if !ok { + return llresult.SimpleResult(path, false, "expected a time.Time") + } + return llresult.ValidResult(path) +}) diff --git a/heartbeat/hbtestllext/validators.go b/heartbeat/hbtestllext/validators.go new file mode 100644 index 00000000000..23a9df5d5cf --- /dev/null +++ b/heartbeat/hbtestllext/validators.go @@ -0,0 +1,32 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package hbtestllext + +import ( + "github.com/elastic/go-lookslike" +) + +// MonitorTimespanValidator is tests for the `next_run` and `next_run_in.us` keys. +var MonitorTimespanValidator = lookslike.MustCompile(map[string]interface{}{ + "monitor": map[string]interface{}{ + "timespan": map[string]interface{}{ + "gte": IsTime, + "lt": IsTime, + }, + }, +}) diff --git a/heartbeat/include/fields.go b/heartbeat/include/fields.go index 8c2cbeaa053..a0cecd709e5 100644 --- a/heartbeat/include/fields.go +++ b/heartbeat/include/fields.go @@ -32,5 +32,5 @@ func init() { // AssetFieldsYml returns asset data. // This is the base64 encoded gzipped contents of fields.yml. func AssetFieldsYml() string { - return "" + return "" } diff --git a/heartbeat/monitors/active/http/http_test.go b/heartbeat/monitors/active/http/http_test.go index 98339385a0c..9066306de86 100644 --- a/heartbeat/monitors/active/http/http_test.go +++ b/heartbeat/monitors/active/http/http_test.go @@ -35,6 +35,7 @@ import ( "github.com/elastic/beats/heartbeat/hbtest" "github.com/elastic/beats/heartbeat/monitors/wrappers" + schedule "github.com/elastic/beats/heartbeat/scheduler/schedule" "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/common/file" @@ -74,7 +75,8 @@ func testTLSRequest(t *testing.T, testURL string, useUrls bool, extraConfig map[ jobs, endpoints, err := create("tls", config) require.NoError(t, err) - job := wrappers.WrapCommon(jobs, "tls", "", "http")[0] + sched, _ := schedule.Parse("@every 1s") + job := wrappers.WrapCommon(jobs, "tls", "", "http", sched, time.Duration(0))[0] event := &beat.Event{} _, err = job(event) @@ -301,7 +303,8 @@ func TestLargeResponse(t *testing.T) { jobs, _, err := create("largeresp", config) require.NoError(t, err) - job := wrappers.WrapCommon(jobs, "test", "", "http")[0] + sched, _ := schedule.Parse("@every 1s") + job := wrappers.WrapCommon(jobs, "test", "", "http", sched, time.Duration(0))[0] event := &beat.Event{} _, err = job(event) @@ -472,7 +475,8 @@ func TestRedirect(t *testing.T) { jobs, _, err := create("redirect", config) require.NoError(t, err) - job := wrappers.WrapCommon(jobs, "test", "", "http")[0] + sched, _ := schedule.Parse("@every 1s") + job := wrappers.WrapCommon(jobs, "test", "", "http", sched, time.Duration(0))[0] event := &beat.Event{} _, err = job(event) diff --git a/heartbeat/monitors/active/tcp/tcp_test.go b/heartbeat/monitors/active/tcp/tcp_test.go index 83230ef7762..22d42b0173f 100644 --- a/heartbeat/monitors/active/tcp/tcp_test.go +++ b/heartbeat/monitors/active/tcp/tcp_test.go @@ -27,19 +27,19 @@ import ( "os" "strconv" "testing" - - "github.com/elastic/go-lookslike/isdef" + "time" "github.com/stretchr/testify/require" - "github.com/elastic/go-lookslike" - "github.com/elastic/go-lookslike/testslike" - "github.com/elastic/beats/heartbeat/hbtest" "github.com/elastic/beats/heartbeat/monitors/wrappers" + "github.com/elastic/beats/heartbeat/scheduler/schedule" "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" btesting "github.com/elastic/beats/libbeat/testing" + "github.com/elastic/go-lookslike" + "github.com/elastic/go-lookslike/isdef" + "github.com/elastic/go-lookslike/testslike" ) func testTCPCheck(t *testing.T, host string, port uint16) *beat.Event { @@ -58,7 +58,8 @@ func testTCPConfigCheck(t *testing.T, configMap common.MapStr, host string, port jobs, endpoints, err := create("tcp", config) require.NoError(t, err) - job := wrappers.WrapCommon(jobs, "test", "", "tcp")[0] + sched, _ := schedule.Parse("@every 1s") + job := wrappers.WrapCommon(jobs, "test", "", "tcp", sched, time.Duration(0))[0] event := &beat.Event{} _, err = job(event) @@ -81,7 +82,8 @@ func testTLSTCPCheck(t *testing.T, host string, port uint16, certFileName string jobs, endpoints, err := create("tcp", config) require.NoError(t, err) - job := wrappers.WrapCommon(jobs, "test", "", "tcp")[0] + sched, _ := schedule.Parse("@every 1s") + job := wrappers.WrapCommon(jobs, "test", "", "tcp", sched, time.Duration(0))[0] event := &beat.Event{} _, err = job(event) diff --git a/heartbeat/monitors/mocks_test.go b/heartbeat/monitors/mocks_test.go index 4c71775fe3f..6fa1793d4f7 100644 --- a/heartbeat/monitors/mocks_test.go +++ b/heartbeat/monitors/mocks_test.go @@ -23,19 +23,18 @@ import ( "sync" "testing" - "github.com/elastic/go-lookslike/isdef" - "github.com/elastic/go-lookslike/validator" - "github.com/stretchr/testify/require" - "github.com/elastic/go-lookslike" - "github.com/elastic/beats/heartbeat/eventext" "github.com/elastic/beats/heartbeat/hbtest" + "github.com/elastic/beats/heartbeat/hbtestllext" "github.com/elastic/beats/heartbeat/monitors/jobs" "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/monitoring" + "github.com/elastic/go-lookslike" + "github.com/elastic/go-lookslike/isdef" + "github.com/elastic/go-lookslike/validator" ) type MockBeatClient struct { @@ -116,6 +115,7 @@ func mockEventMonitorValidator(id string) validator.Validator { "check_group": isdef.IsString, }, }), + hbtestllext.MonitorTimespanValidator, hbtest.SummaryChecks(1, 0), lookslike.MustCompile(mockEventCustomFields()), )) diff --git a/heartbeat/monitors/monitor.go b/heartbeat/monitors/monitor.go index c11fecca2c0..01213f2ad2d 100644 --- a/heartbeat/monitors/monitor.go +++ b/heartbeat/monitors/monitor.go @@ -163,7 +163,7 @@ func newMonitorUnsafe( } rawJobs, endpoints, err := monitorPlugin.create(config) - wrappedJobs := wrappers.WrapCommon(rawJobs, m.id, m.name, m.typ) + wrappedJobs := wrappers.WrapCommon(rawJobs, m.id, m.name, m.typ, mpi.Schedule, mpi.Timeout) m.endpoints = endpoints if err != nil { diff --git a/heartbeat/monitors/pluginconf.go b/heartbeat/monitors/pluginconf.go index 26df3252eed..20aace700fd 100644 --- a/heartbeat/monitors/pluginconf.go +++ b/heartbeat/monitors/pluginconf.go @@ -18,8 +18,11 @@ package monitors import ( + "time" + "github.com/pkg/errors" + "github.com/elastic/beats/heartbeat/scheduler/schedule" "github.com/elastic/beats/libbeat/common" ) @@ -28,10 +31,12 @@ var ErrPluginDisabled = errors.New("Monitor not loaded, plugin is disabled") // MonitorPluginInfo represents the generic configuration options around a monitor plugin. type MonitorPluginInfo struct { - ID string `config:"id"` - Name string `config:"name"` - Type string `config:"type" validate:"required"` - Enabled bool `config:"enabled"` + ID string `config:"id"` + Name string `config:"name"` + Type string `config:"type" validate:"required"` + Schedule *schedule.Schedule `config:"schedule" validate:"required"` + Timeout time.Duration `config:"timeout"` + Enabled bool `config:"enabled"` } func pluginInfo(config *common.Config) (MonitorPluginInfo, error) { diff --git a/heartbeat/monitors/wrappers/monitors.go b/heartbeat/monitors/wrappers/monitors.go index 4de60250582..c565d7481c0 100644 --- a/heartbeat/monitors/wrappers/monitors.go +++ b/heartbeat/monitors/wrappers/monitors.go @@ -22,6 +22,8 @@ import ( "sync" "time" + "github.com/elastic/beats/heartbeat/scheduler/schedule" + "github.com/gofrs/uuid" "github.com/mitchellh/hashstructure" "github.com/pkg/errors" @@ -35,23 +37,24 @@ import ( ) // WrapCommon applies the common wrappers that all monitor jobs get. -func WrapCommon(js []jobs.Job, id string, name string, typ string) []jobs.Job { +func WrapCommon(js []jobs.Job, id string, name string, typ string, sched *schedule.Schedule, timeout time.Duration) []jobs.Job { return jobs.WrapAllSeparately( jobs.WrapAll( js, addMonitorStatus, addMonitorDuration, ), func() jobs.JobWrapper { - return addMonitorMeta(id, name, typ, len(js) > 1) + return addMonitorMeta(id, name, typ, len(js) > 1, sched, timeout) }, func() jobs.JobWrapper { return makeAddSummary() }) } // addMonitorMeta adds the id, name, and type fields to the monitor. -func addMonitorMeta(id string, name string, typ string, isMulti bool) jobs.JobWrapper { +func addMonitorMeta(id string, name string, typ string, isMulti bool, sched *schedule.Schedule, timeout time.Duration) jobs.JobWrapper { return func(job jobs.Job) jobs.Job { return func(event *beat.Event) ([]jobs.Job, error) { + started := time.Now() cont, e := job(event) thisID := id @@ -69,9 +72,10 @@ func addMonitorMeta(id string, name string, typ string, isMulti bool) jobs.JobWr event, common.MapStr{ "monitor": common.MapStr{ - "id": thisID, - "name": name, - "type": typ, + "id": thisID, + "name": name, + "type": typ, + "timespan": timespan(started, sched, timeout), }, }, ) @@ -81,6 +85,19 @@ func addMonitorMeta(id string, name string, typ string, isMulti bool) jobs.JobWr } } +func timespan(started time.Time, sched *schedule.Schedule, timeout time.Duration) common.MapStr { + maxEnd := sched.Next(started) + + if maxEnd.Sub(started) < timeout { + maxEnd = started.Add(timeout) + } + + return common.MapStr{ + "gte": started, + "lt": maxEnd, + } +} + // addMonitorStatus wraps the given Job's execution such that any error returned // by the original Job will be set as a field. The original error will not be // passed through as a return value. Errors may still be present but only if there diff --git a/heartbeat/monitors/wrappers/monitors_test.go b/heartbeat/monitors/wrappers/monitors_test.go index 8f4d558b6fc..51e17890e49 100644 --- a/heartbeat/monitors/wrappers/monitors_test.go +++ b/heartbeat/monitors/wrappers/monitors_test.go @@ -20,22 +20,23 @@ package wrappers import ( "fmt" "net/url" + "reflect" "testing" - - "github.com/elastic/go-lookslike/isdef" - - "github.com/elastic/go-lookslike/validator" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "github.com/elastic/go-lookslike" - "github.com/elastic/go-lookslike/testslike" - "github.com/elastic/beats/heartbeat/eventext" + "github.com/elastic/beats/heartbeat/hbtestllext" "github.com/elastic/beats/heartbeat/monitors/jobs" + "github.com/elastic/beats/heartbeat/scheduler/schedule" "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" + "github.com/elastic/go-lookslike" + "github.com/elastic/go-lookslike/isdef" + "github.com/elastic/go-lookslike/testslike" + "github.com/elastic/go-lookslike/validator" ) type fields struct { @@ -54,7 +55,8 @@ type testDef struct { func testCommonWrap(t *testing.T, tt testDef) { t.Run(tt.name, func(t *testing.T) { - wrapped := WrapCommon(tt.jobs, tt.fields.id, tt.fields.name, tt.fields.typ) + schedule, _ := schedule.Parse("@every 1s") + wrapped := WrapCommon(tt.jobs, tt.fields.id, tt.fields.name, tt.fields.typ, schedule, time.Duration(0)) results, err := jobs.ExecJobsAndConts(t, wrapped) assert.NoError(t, err) @@ -93,6 +95,7 @@ func TestSimpleJob(t *testing.T) { "check_group": isdef.IsString, }, }), + hbtestllext.MonitorTimespanValidator, summaryValidator(1, 0), )}, nil, @@ -127,6 +130,7 @@ func TestErrorJob(t *testing.T) { []validator.Validator{ lookslike.Compose( errorJobValidator, + hbtestllext.MonitorTimespanValidator, summaryValidator(0, 1), )}, nil, @@ -151,6 +155,7 @@ func TestMultiJobNoConts(t *testing.T) { "check_group": uniqScope.IsUniqueTo("check_group"), }, }), + hbtestllext.MonitorTimespanValidator, summaryValidator(1, 0), ) } @@ -199,6 +204,7 @@ func TestMultiJobConts(t *testing.T) { "check_group": uniqScope.IsUniqueTo(u), }, }), + hbtestllext.MonitorTimespanValidator, ) } @@ -258,6 +264,7 @@ func TestMultiJobContsCancelledEvents(t *testing.T) { "check_group": uniqScope.IsUniqueTo(u), }, }), + hbtestllext.MonitorTimespanValidator, ) } @@ -316,3 +323,44 @@ func summaryValidator(up int, down int) validator.Validator { }, }) } + +func TestTimespan(t *testing.T) { + now := time.Now() + sched10s, err := schedule.Parse("@every 10s") + require.NoError(t, err) + + type args struct { + started time.Time + sched *schedule.Schedule + timeout time.Duration + } + tests := []struct { + name string + args args + want common.MapStr + }{ + { + "interval longer than timeout", + args{now, sched10s, time.Second}, + common.MapStr{ + "gte": now, + "lt": now.Add(time.Second * 10), + }, + }, + { + "timeout longer than interval", + args{now, sched10s, time.Second * 20}, + common.MapStr{ + "gte": now, + "lt": now.Add(time.Second * 20), + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := timespan(tt.args.started, tt.args.sched, tt.args.timeout); !reflect.DeepEqual(got, tt.want) { + t.Errorf("timespan() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/heartbeat/scheduler/schedule/schedule.go b/heartbeat/scheduler/schedule/schedule.go index f7c73129a81..696a6b8fa00 100644 --- a/heartbeat/scheduler/schedule/schedule.go +++ b/heartbeat/scheduler/schedule/schedule.go @@ -29,6 +29,7 @@ type Schedule struct { scheduler.Schedule } +// intervalScheduler defines a schedule that runs at fixed intervals. type intervalScheduler struct { interval time.Duration } diff --git a/libbeat/mapping/field.go b/libbeat/mapping/field.go index dd226201a76..66cde47e834 100644 --- a/libbeat/mapping/field.go +++ b/libbeat/mapping/field.go @@ -132,6 +132,8 @@ func (f *Field) validateType() error { return dateType.validate(f.Format) case "geo_point": return geoPointType.validate(f.Format) + case "date_range": + return dateRangeType.validate(f.Format) case "boolean", "binary", "ip", "alias", "array": if f.Format != "" { return fmt.Errorf("no format expected for field %s, found: %s", f.Name, f.Format) @@ -158,10 +160,11 @@ type fieldTypeGroup struct { } var ( - stringType = fieldTypeGroup{"string", []string{"string", "url"}} - numberType = fieldTypeGroup{"number", []string{"string", "url", "bytes", "duration", "number", "percent", "color"}} - dateType = fieldTypeGroup{"date", []string{"string", "url", "date"}} - geoPointType = fieldTypeGroup{"geo_point", []string{"geo_point"}} + stringType = fieldTypeGroup{"string", []string{"string", "url"}} + numberType = fieldTypeGroup{"number", []string{"string", "url", "bytes", "duration", "number", "percent", "color"}} + dateType = fieldTypeGroup{"date", []string{"string", "url", "date"}} + geoPointType = fieldTypeGroup{"geo_point", []string{"geo_point"}} + dateRangeType = fieldTypeGroup{"date_range", []string{"date_range"}} ) func (g *fieldTypeGroup) validate(formatter string) error { diff --git a/libbeat/tests/system/beat/beat.py b/libbeat/tests/system/beat/beat.py index d26ac75a071..d2f7f5b83fe 100644 --- a/libbeat/tests/system/beat/beat.py +++ b/libbeat/tests/system/beat/beat.py @@ -683,7 +683,9 @@ def is_documented(key, docs): for key in flat.keys(): metaKey = key.startswith('@metadata.') - if not(is_documented(key, expected_fields) or metaKey): + # Range keys as used in 'date_range' etc will not have docs of course + isRangeKey = key.split('.')[-1] in ['gte', 'gt', 'lte', 'lt'] + if not(is_documented(key, expected_fields) or metaKey or isRangeKey): raise Exception("Key '{}' found in event is not documented!".format(key)) if is_documented(key, aliases): raise Exception("Key '{}' found in event is documented as an alias!".format(key))