From a33740874fb0b271d59bd7caf250dc55f49f1e5b Mon Sep 17 00:00:00 2001 From: Andrew Cholakian Date: Thu, 31 Aug 2023 15:59:38 -0500 Subject: [PATCH] [Heartbeat] Monitor Retries (#36147) Adds retries to Heartbeat monitors. Part of https://github.com/elastic/synthetics/issues/792 This refactors a ton of code around summarizing events, and cleans up a lot of tech debt as well. --- .devcontainer/devcontainer.json | 2 +- .golangci.yml | 3 + Jenkinsfile | 2 +- dev-tools/mage/gotest.go | 20 ++ dev-tools/mage/target/unittest/unittest.go | 11 +- heartbeat/_meta/fields.common.yml | 20 ++ .../autodiscover/builder/hints/monitors.go | 2 +- heartbeat/docs/fields.asciidoc | 50 ++++ heartbeat/hbtest/hbtestutil.go | 29 +- heartbeat/include/fields.go | 2 +- .../monitors/active/dialchain/dialchain.go | 2 +- heartbeat/monitors/active/http/http_test.go | 74 +++--- heartbeat/monitors/active/icmp/icmp_test.go | 2 +- heartbeat/monitors/active/tcp/socks5_test.go | 2 +- heartbeat/monitors/active/tcp/tcp_test.go | 12 +- heartbeat/monitors/active/tcp/tls_test.go | 8 +- heartbeat/monitors/jobs/job.go | 18 -- heartbeat/monitors/jobs/testing.go | 13 +- heartbeat/monitors/mocks.go | 2 +- heartbeat/monitors/monitor.go | 26 +- heartbeat/monitors/stdfields/stdfields.go | 6 +- heartbeat/monitors/task.go | 3 +- .../wrappers/monitorstate/esloader_test.go | 6 +- .../wrappers/monitorstate/monitorstate.go | 15 +- .../monitorstate/monitorstate_test.go | 12 +- .../monitors/wrappers/monitorstate/tracker.go | 16 +- .../wrappers/monitorstate/tracker_test.go | 18 +- .../wrappers/summarizer/summarizer.go | 167 ++++++++++++ .../wrappers/summarizer/summarizer_test.go | 181 +++++++++++++ .../summarizertesthelper/testhelper.go | 56 ++++ heartbeat/monitors/wrappers/wrappers.go | 184 ++----------- heartbeat/monitors/wrappers/wrappers_test.go | 251 ++++++++++++------ heartbeat/tracer/tracer_test.go | 3 +- x-pack/heartbeat/include/fields.go | 2 +- x-pack/heartbeat/monitors/browser/browser.go | 2 +- x-pack/heartbeat/monitors/browser/config.go | 2 +- .../heartbeat/monitors/browser/config_test.go | 2 +- .../monitors/browser/source/inline.go | 2 +- .../monitors/browser/source/inline_test.go | 2 +- .../monitors/browser/source/local.go | 2 +- .../monitors/browser/source/local_test.go | 2 +- .../monitors/browser/source/offline.go | 2 +- .../monitors/browser/source/project.go | 2 +- .../monitors/browser/source/project_test.go | 2 +- .../monitors/browser/source/source.go | 2 +- .../monitors/browser/source/source_test.go | 2 +- .../monitors/browser/source/unzip.go | 3 +- .../monitors/browser/source/zipurl.go | 2 +- .../monitors/browser/source/zipurl_test.go | 2 +- .../heartbeat/monitors/browser/sourcejob.go | 2 +- .../monitors/browser/sourcejob_test.go | 2 +- .../monitors/browser/synthexec/enrich.go | 18 +- .../monitors/browser/synthexec/enrich_test.go | 30 +-- .../browser/synthexec/execmultiplexer.go | 2 +- .../browser/synthexec/execmultiplexer_test.go | 4 +- .../monitors/browser/synthexec/synthexec.go | 2 +- .../browser/synthexec/synthexec_linux.go | 2 +- .../browser/synthexec/synthexec_test.go | 2 +- .../monitors/browser/synthexec/synthtypes.go | 2 +- .../browser/synthexec/synthtypes_test.go | 2 +- .../browser/synthexec/testcmd/main.go | 2 +- x-pack/heartbeat/scenarios/basics_test.go | 81 ++++-- .../heartbeat/scenarios/browserscenarios.go | 2 +- .../scenarios/framework/fakeloader.go | 35 +-- .../scenarios/framework/framework.go | 74 ++++-- x-pack/heartbeat/scenarios/scenarios.go | 36 --- .../heartbeat/scenarios/stateloader_test.go | 26 +- x-pack/heartbeat/scenarios/testws.go | 68 +++++ x-pack/heartbeat/scenarios/twists.go | 17 +- 69 files changed, 1098 insertions(+), 562 deletions(-) create mode 100644 heartbeat/monitors/wrappers/summarizer/summarizer.go create mode 100644 heartbeat/monitors/wrappers/summarizer/summarizer_test.go create mode 100644 heartbeat/monitors/wrappers/summarizer/summarizertesthelper/testhelper.go create mode 100644 x-pack/heartbeat/scenarios/testws.go diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json index 27348930fdfa..571faef670aa 100644 --- a/.devcontainer/devcontainer.json +++ b/.devcontainer/devcontainer.json @@ -21,7 +21,7 @@ // Use 'postCreateCommand' to run commands after the container is created. // Mage is installed this way, and not via the feature plugin because that plugin was // broken for me, and mage install is simple enough - "postCreateCommand": "cd /opt/; sudo mkdir mage; sudo chown $USER:$(id -g) mage; git clone --depth=1 https://github.com/magefile/mage && cd mage && go run bootstrap.go" + "postCreateCommand": "cd /opt/; sudo mkdir mage; sudo chown $USER:$(id -g) mage; git clone --depth=1 https://github.com/magefile/mage && cd mage && go run bootstrap.go; npm i -g @elastic/synthetics; sudo env \"PATH=$PATH\" npx -yes playwright install-deps" // Configure tool-specific properties. // "customizations": {}, diff --git a/.golangci.yml b/.golangci.yml index 201454172eab..834881d49d72 100755 --- a/.golangci.yml +++ b/.golangci.yml @@ -2,6 +2,9 @@ run: # timeout for analysis, e.g. 30s, 5m, default is 1m timeout: 15m + build-tags: + - synthetics + - integration issues: # Maximum count of issues with the same text. diff --git a/Jenkinsfile b/Jenkinsfile index 68ea32bf2321..32276ca72f5f 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -353,7 +353,7 @@ def withTools(Map args = [:], Closure body) { body() } } else if (args.get('nodejs', false)) { - withNodeJSEnv() { + withNodeJSEnv(version: '18.17.1') { withEnv(["ELASTIC_SYNTHETICS_CAPABLE=true"]) { cmd(label: "Install @elastic/synthetics", script: "npm i -g @elastic/synthetics") body() diff --git a/dev-tools/mage/gotest.go b/dev-tools/mage/gotest.go index 082d9748a39d..5065882fdb8f 100644 --- a/dev-tools/mage/gotest.go +++ b/dev-tools/mage/gotest.go @@ -107,6 +107,25 @@ func DefaultGoTestUnitArgs() GoTestArgs { return makeGoTestArgs("Unit") } func DefaultGoTestIntegrationArgs() GoTestArgs { args := makeGoTestArgs("Integration") args.Tags = append(args.Tags, "integration") + + synth := exec.Command("npx", "@elastic/synthetics", "-h") + if synth.Run() == nil { + // Run an empty journey to ensure playwright can be loaded + // catches situations like missing playwright deps + cmd := exec.Command("sh", "-c", "echo 'step(\"t\", () => { })' | elastic-synthetics --inline") + var out strings.Builder + cmd.Stdout = &out + cmd.Stderr = &out + err := cmd.Run() + if err != nil || cmd.ProcessState.ExitCode() != 0 { + fmt.Printf("synthetics is available, but not invokable, command exited with bad code: %s\n", out.String()) + } + + fmt.Println("npx @elastic/synthetics found, will run with synthetics tags") + os.Setenv("ELASTIC_SYNTHETICS_CAPABLE", "true") + args.Tags = append(args.Tags, "synthetics") + } + // Use the non-cachable -count=1 flag to disable test caching when running integration tests. // There are reasons to re-run tests even if the code is unchanged (e.g. Dockerfile changes). args.ExtraFlags = append(args.ExtraFlags, "-count=1") @@ -125,6 +144,7 @@ func DefaultGoTestIntegrationFromHostArgs() GoTestArgs { // module integration tests. We tag integration test files with 'integration'. func GoTestIntegrationArgsForModule(module string) GoTestArgs { args := makeGoTestArgsForModule("Integration", module) + args.Tags = append(args.Tags, "integration") return args } diff --git a/dev-tools/mage/target/unittest/unittest.go b/dev-tools/mage/target/unittest/unittest.go index d4201421177c..c6f6afc0a5f6 100644 --- a/dev-tools/mage/target/unittest/unittest.go +++ b/dev-tools/mage/target/unittest/unittest.go @@ -20,6 +20,7 @@ package unittest import ( "context" "fmt" + "os/exec" "github.com/magefile/mage/mg" @@ -55,7 +56,15 @@ func UnitTest() { // Use RACE_DETECTOR=true to enable the race detector. func GoUnitTest(ctx context.Context) error { mg.SerialCtxDeps(ctx, goTestDeps...) - return devtools.GoTest(ctx, devtools.DefaultGoTestUnitArgs()) + + utArgs := devtools.DefaultGoTestUnitArgs() + // If synthetics is installed run synthetics unit tests + synth := exec.Command("npx", "@elastic/synthetics", "-h") + if synth.Run() == nil { + fmt.Printf("npx @elastic/synthetics found, will run with synthetics tags") + utArgs.Tags = append(utArgs.Tags, "synthetics") + } + return devtools.GoTest(ctx, utArgs) } // PythonUnitTest executes the python system tests. diff --git a/heartbeat/_meta/fields.common.yml b/heartbeat/_meta/fields.common.yml index dc0f5bb7c438..b710fdde2e5d 100644 --- a/heartbeat/_meta/fields.common.yml +++ b/heartbeat/_meta/fields.common.yml @@ -175,6 +175,26 @@ type: integer description: > The number of endpoints that failed + - name: status + type: keyword + description: > + The status of this check as a whole. Either up or down. + - name: attempt + type: short + description: > + When performing a check this number is 1 for the first check, and increments in the event of a retry. + - name: max_attempts + type: short + description: > + The maximum number of checks that may be performed. Note, the actual number may be smaller. + - name: final_attempt + type: boolean + description: > + True if no further checks will be performed in this retry group. + - name: retry_group + type: keyword + description: > + A unique token used to group checks across attempts. - key: service title: "APM Service" description: diff --git a/heartbeat/autodiscover/builder/hints/monitors.go b/heartbeat/autodiscover/builder/hints/monitors.go index 3998fa0958bd..66487868e911 100644 --- a/heartbeat/autodiscover/builder/hints/monitors.go +++ b/heartbeat/autodiscover/builder/hints/monitors.go @@ -170,7 +170,7 @@ func (hb *heartbeatHints) getHostsWithPort(hints mapstr.M, port int, podEvent bo return nil, fmt.Errorf("no hosts selected for port %d with hints: %+v", port, thosts) } - var result []string + result := make([]string, 0, len(hostSet)) for host := range hostSet { result = append(result, host) } diff --git a/heartbeat/docs/fields.asciidoc b/heartbeat/docs/fields.asciidoc index cb5516ecf5d6..bda227294490 100644 --- a/heartbeat/docs/fields.asciidoc +++ b/heartbeat/docs/fields.asciidoc @@ -16427,6 +16427,56 @@ type: integer -- +*`summary.status`*:: ++ +-- +The status of this check as a whole. Either up or down. + + +type: keyword + +-- + +*`summary.attempt`*:: ++ +-- +When performing a check this number is 1 for the first check, and increments in the event of a retry. + + +type: short + +-- + +*`summary.max_attempts`*:: ++ +-- +The maximum number of checks that may be performed. Note, the actual number may be smaller. + + +type: short + +-- + +*`summary.final_attempt`*:: ++ +-- +True if no further checks will be performed in this retry group. + + +type: boolean + +-- + +*`summary.retry_group`*:: ++ +-- +A unique token used to group checks across attempts. + + +type: keyword + +-- + [[exported-fields-synthetics]] == Synthetics types fields diff --git a/heartbeat/hbtest/hbtestutil.go b/heartbeat/hbtest/hbtestutil.go index eae7fb73e58e..86c1e4a34d20 100644 --- a/heartbeat/hbtest/hbtestutil.go +++ b/heartbeat/hbtest/hbtestutil.go @@ -39,6 +39,7 @@ import ( "github.com/elastic/beats/v7/heartbeat/ecserr" "github.com/elastic/beats/v7/heartbeat/monitors/active/dialchain/tlsmeta" + "github.com/elastic/beats/v7/heartbeat/monitors/wrappers/summarizer/summarizertesthelper" "github.com/elastic/beats/v7/heartbeat/hbtestllext" @@ -142,17 +143,13 @@ func TLSChecks(chainIndex, certIndex int, certificate *x509.Certificate) validat PeerCertificates: []*x509.Certificate{certificate}, }, time.Duration(1)) - //nolint:errcheck // There are no new changes to this line but - // linter has been activated in the meantime. We'll cleanup separately. - expected.Put("tls.rtt.handshake.us", hbtestllext.IsInt64) + _, _ = expected.Put("tls.rtt.handshake.us", hbtestllext.IsInt64) // Generally, the exact cipher will match, but on windows 7 32bit this is not true! // We don't actually care about the exact cipher matching, since we're not testing the TLS // implementation, we trust go there, just that most of the metadata is present if runtime.GOOS == "windows" && bits.UintSize == 32 { - //nolint:errcheck // There are no new changes to this line but - // linter has been activated in the meantime. We'll cleanup separately. - expected.Put("tls.cipher", isdef.IsString) + _, _ = expected.Put("tls.cipher", isdef.IsString) } return lookslike.MustCompile(expected) @@ -190,15 +187,14 @@ func BaseChecks(ip string, status string, typ string) validator.Validator { ) } -// SummaryChecks validates the "summary" + "state" fields -func SummaryChecks(up int, down int) validator.Validator { - return lookslike.MustCompile(map[string]interface{}{ - "summary": map[string]interface{}{ - "up": uint16(up), - "down": uint16(down), - }, - "state": hbtestllext.IsMonitorState, - }) +// SummaryStateChecks validates the "summary" + "state" fields +func SummaryStateChecks(up uint16, down uint16) validator.Validator { + return lookslike.Compose( + summarizertesthelper.SummaryValidator(up, down), + lookslike.MustCompile(map[string]interface{}{ + "state": hbtestllext.IsMonitorState, + }), + ) } // ResolveChecks returns a lookslike matcher for the 'resolve' fields. @@ -289,8 +285,7 @@ func StartHTTPSServer(t *testing.T, tlsCert tls.Certificate) (host string, port require.NoError(t, err) // No need to start a real server, since this is invalid, we just - //nolint:gosec // There are no new changes to this line but - // linter has been activated in the meantime. We'll cleanup separately. + //nolint:gosec // it's a test, sec issues don't apply l, err := tls.Listen("tcp", "127.0.0.1:0", &tls.Config{ Certificates: []tls.Certificate{tlsCert}, }) diff --git a/heartbeat/include/fields.go b/heartbeat/include/fields.go index 88552169280a..d5323f72605d 100644 --- a/heartbeat/include/fields.go +++ b/heartbeat/include/fields.go @@ -32,5 +32,5 @@ func init() { // AssetFieldsYml returns asset data. // This is the base64 encoded zlib format compressed contents of fields.yml. func AssetFieldsYml() string { - return "" + return "" } diff --git a/heartbeat/monitors/active/dialchain/dialchain.go b/heartbeat/monitors/active/dialchain/dialchain.go index 62f2730a7907..f7c82aa0bec7 100644 --- a/heartbeat/monitors/active/dialchain/dialchain.go +++ b/heartbeat/monitors/active/dialchain/dialchain.go @@ -56,7 +56,7 @@ func (c *DialerChain) Clone() *DialerChain { func (c *DialerChain) Build(event *beat.Event) (d transport.Dialer, err error) { d, err = c.Net.build(event) if err != nil { - return + return d, err } for _, layer := range c.Layers { diff --git a/heartbeat/monitors/active/http/http_test.go b/heartbeat/monitors/active/http/http_test.go index 27501c32da1f..78a2f24599c8 100644 --- a/heartbeat/monitors/active/http/http_test.go +++ b/heartbeat/monitors/active/http/http_test.go @@ -49,6 +49,7 @@ import ( "github.com/elastic/beats/v7/heartbeat/ecserr" "github.com/elastic/beats/v7/heartbeat/hbtest" "github.com/elastic/beats/v7/heartbeat/hbtestllext" + "github.com/elastic/beats/v7/heartbeat/monitors/jobs" "github.com/elastic/beats/v7/heartbeat/monitors/stdfields" "github.com/elastic/beats/v7/heartbeat/monitors/wrappers" "github.com/elastic/beats/v7/heartbeat/scheduler/schedule" @@ -262,7 +263,7 @@ func TestUpStatuses(t *testing.T) { lookslike.Compose( hbtest.BaseChecks("127.0.0.1", "up", "http"), hbtest.RespondingTCPChecks(), - hbtest.SummaryChecks(1, 0), + hbtest.SummaryStateChecks(1, 0), respondingHTTPChecks(server.URL, "text/plain; charset=utf-8", status), ), event.Fields, @@ -279,7 +280,7 @@ func TestHeadersDisabled(t *testing.T) { lookslike.Strict(lookslike.Compose( hbtest.BaseChecks("127.0.0.1", "up", "http"), hbtest.RespondingTCPChecks(), - hbtest.SummaryChecks(1, 0), + hbtest.SummaryStateChecks(1, 0), respondingHTTPChecks(server.URL, "text/plain; charset=utf-8", 200), )), event.Fields, @@ -297,7 +298,7 @@ func TestDownStatuses(t *testing.T) { lookslike.Strict(lookslike.Compose( hbtest.BaseChecks("127.0.0.1", "down", "http"), hbtest.RespondingTCPChecks(), - hbtest.SummaryChecks(0, 1), + hbtest.SummaryStateChecks(0, 1), respondingHTTPChecks(server.URL, "text/plain; charset=utf-8", status), hbtest.ECSErrChecks(ecserr.NewBadHTTPStatusErr(status)), respondingHTTPBodyChecks("hello, world!"), @@ -336,7 +337,7 @@ func TestLargeResponse(t *testing.T) { lookslike.Strict(lookslike.Compose( hbtest.BaseChecks("127.0.0.1", "up", "http"), hbtest.RespondingTCPChecks(), - hbtest.SummaryChecks(1, 0), + hbtest.SummaryStateChecks(1, 0), respondingHTTPChecks(server.URL, "text/plain; charset=utf-8", 200), )), event.Fields, @@ -453,7 +454,7 @@ func TestJsonBody(t *testing.T) { lookslike.Strict(lookslike.Compose( hbtest.BaseChecks("127.0.0.1", "up", "http"), hbtest.RespondingTCPChecks(), - hbtest.SummaryChecks(1, 0), + hbtest.SummaryStateChecks(1, 0), respondingHTTPChecks(server.URL, tc.expectedContentType, 200), )), event.Fields, @@ -464,7 +465,7 @@ func TestJsonBody(t *testing.T) { lookslike.Strict(lookslike.Compose( hbtest.BaseChecks("127.0.0.1", "down", "http"), hbtest.RespondingTCPChecks(), - hbtest.SummaryChecks(0, 1), + hbtest.SummaryStateChecks(0, 1), hbtest.ErrorChecks(tc.expectedErrMsg, "validate"), respondingHTTPChecks(server.URL, tc.expectedContentType, 200), )), @@ -530,7 +531,7 @@ func runHTTPSServerCheck( hbtest.BaseChecks("127.0.0.1", "up", "http"), hbtest.RespondingTCPChecks(), hbtest.TLSChecks(0, 0, cert), - hbtest.SummaryChecks(1, 0), + hbtest.SummaryStateChecks(1, 0), respondingHTTPChecks(server.URL, "text/plain; charset=utf-8", http.StatusOK), )), event.Fields, @@ -559,7 +560,7 @@ func TestExpiredHTTPSServer(t *testing.T) { lookslike.Strict(lookslike.Compose( hbtest.BaseChecks("127.0.0.1", "down", "http"), hbtest.RespondingTCPChecks(), - hbtest.SummaryChecks(0, 1), + hbtest.SummaryStateChecks(0, 1), hbtest.ExpiredCertChecks(cert), hbtest.URLChecks(t, &url.URL{Scheme: "https", Host: net.JoinHostPort(host, port)}), // No HTTP fields expected because we fail at the TCP level @@ -617,7 +618,7 @@ func TestConnRefusedJob(t *testing.T) { t, lookslike.Strict(lookslike.Compose( hbtest.BaseChecks(ip, "down", "http"), - hbtest.SummaryChecks(0, 1), + hbtest.SummaryStateChecks(0, 1), hbtest.ECSErrCodeChecks(ecserr.CODE_NET_COULD_NOT_CONNECT, fmt.Sprintf("%s:%d", ip, port)), urlChecks(url), )), @@ -639,7 +640,7 @@ func TestUnreachableJob(t *testing.T) { t, lookslike.Strict(lookslike.Compose( hbtest.BaseChecks(ip, "down", "http"), - hbtest.SummaryChecks(0, 1), + hbtest.SummaryStateChecks(0, 1), hbtest.ECSErrCodeChecks(ecserr.CODE_NET_COULD_NOT_CONNECT, fmt.Sprintf("%s:%d", ip, port)), urlChecks(url), )), @@ -673,33 +674,30 @@ func TestRedirect(t *testing.T) { sched, _ := schedule.Parse("@every 1s") job := wrappers.WrapCommon(p.Jobs, stdfields.StdMonitorFields{ID: "test", Type: "http", Schedule: sched, Timeout: 1}, nil)[0] - // Run this test multiple times since in the past we had an issue where the redirects - // list was added onto by each request. See https://github.com/elastic/beats/pull/15944 - for i := 0; i < 10; i++ { - event := &beat.Event{} - _, err = job(event) - require.NoError(t, err) - - testslike.Test( - t, - lookslike.Compose( - hbtest.BaseChecks("", "up", "http"), - hbtest.SummaryChecks(1, 0), - minimalRespondingHTTPChecks(testURL, "text/plain; charset=utf-8", 200), - respondingHTTPHeaderChecks(), - lookslike.MustCompile(map[string]interface{}{ - // For redirects that are followed we shouldn't record this header because there's no sensible - // value - "http.response.headers.Location": isdef.KeyMissing, - "http.response.redirects": []string{ - server.URL + redirectingPaths["/redirect_one"], - server.URL + redirectingPaths["/redirect_two"], - }, - }), - ), - event.Fields, - ) - } + events, err := jobs.ExecJobAndConts(t, job) + require.NoError(t, err) + require.Len(t, events, 1) + event := events[0] + + testslike.Test( + t, + lookslike.Compose( + hbtest.BaseChecks("", "up", "http"), + minimalRespondingHTTPChecks(testURL, "text/plain; charset=utf-8", 200), + respondingHTTPHeaderChecks(), + hbtest.SummaryStateChecks(1, 0), + lookslike.MustCompile(map[string]interface{}{ + // For redirects that are followed we shouldn't record this header because there's no sensible + // value + "http.response.headers.Location": isdef.KeyMissing, + "http.response.redirects": []string{ + server.URL + redirectingPaths["/redirect_one"], + server.URL + redirectingPaths["/redirect_two"], + }, + }), + ), + event.Fields, + ) } func TestNoHeaders(t *testing.T) { @@ -728,7 +726,7 @@ func TestNoHeaders(t *testing.T) { t, lookslike.Strict(lookslike.Compose( hbtest.BaseChecks("127.0.0.1", "up", "http"), - hbtest.SummaryChecks(1, 0), + hbtest.SummaryStateChecks(1, 0), hbtest.RespondingTCPChecks(), respondingHTTPStatusAndTimingChecks(200), minimalRespondingHTTPChecks(server.URL, "text/plain; charset=utf-8", 200), diff --git a/heartbeat/monitors/active/icmp/icmp_test.go b/heartbeat/monitors/active/icmp/icmp_test.go index f4597d0a4db8..59ffc257505d 100644 --- a/heartbeat/monitors/active/icmp/icmp_test.go +++ b/heartbeat/monitors/active/icmp/icmp_test.go @@ -50,7 +50,7 @@ func TestICMPFields(t *testing.T) { validator := lookslike.Strict( lookslike.Compose( hbtest.BaseChecks(ip, "up", "icmp"), - hbtest.SummaryChecks(1, 0), + hbtest.SummaryStateChecks(1, 0), hbtest.URLChecks(t, hostURL), hbtest.ResolveChecks(ip), lookslike.MustCompile(map[string]interface{}{ diff --git a/heartbeat/monitors/active/tcp/socks5_test.go b/heartbeat/monitors/active/tcp/socks5_test.go index 466253365c3a..e9d1af359210 100644 --- a/heartbeat/monitors/active/tcp/socks5_test.go +++ b/heartbeat/monitors/active/tcp/socks5_test.go @@ -78,7 +78,7 @@ func TestSocks5Job(t *testing.T) { hbtest.BaseChecks(ip, "up", "tcp"), hbtest.RespondingTCPChecks(), hbtest.SimpleURLChecks(t, "tcp", host, port), - hbtest.SummaryChecks(1, 0), + hbtest.SummaryStateChecks(1, 0), hbtest.ResolveChecks(ip), lookslike.MustCompile(map[string]interface{}{ "tcp": map[string]interface{}{ diff --git a/heartbeat/monitors/active/tcp/tcp_test.go b/heartbeat/monitors/active/tcp/tcp_test.go index 62b01f4a4ead..c5cc0dd614e6 100644 --- a/heartbeat/monitors/active/tcp/tcp_test.go +++ b/heartbeat/monitors/active/tcp/tcp_test.go @@ -100,7 +100,7 @@ func TestUpEndpointJob(t *testing.T) { validators := []validator.Validator{ hbtest.BaseChecks(serverURL.Hostname(), "up", "tcp"), - hbtest.SummaryChecks(1, 0), + hbtest.SummaryStateChecks(1, 0), hbtest.URLChecks(t, hostURL), hbtest.RespondingTCPChecks(), } @@ -130,7 +130,7 @@ func TestConnectionRefusedEndpointJob(t *testing.T) { t, lookslike.Strict(lookslike.Compose( hbtest.BaseChecks(ip, "down", "tcp"), - hbtest.SummaryChecks(0, 1), + hbtest.SummaryStateChecks(0, 1), hbtest.SimpleURLChecks(t, "tcp", ip, port), hbtest.ECSErrCodeChecks(ecserr.CODE_NET_COULD_NOT_CONNECT, dialErr), )), @@ -148,7 +148,7 @@ func TestUnreachableEndpointJob(t *testing.T) { t, lookslike.Strict(lookslike.Compose( hbtest.BaseChecks(ip, "down", "tcp"), - hbtest.SummaryChecks(0, 1), + hbtest.SummaryStateChecks(0, 1), hbtest.SimpleURLChecks(t, "tcp", ip, port), hbtest.ECSErrCodeChecks(ecserr.CODE_NET_COULD_NOT_CONNECT, dialErr), )), @@ -177,7 +177,7 @@ func TestCheckUp(t *testing.T) { hbtest.BaseChecks(ip, "up", "tcp"), hbtest.RespondingTCPChecks(), hbtest.SimpleURLChecks(t, "tcp", host, port), - hbtest.SummaryChecks(1, 0), + hbtest.SummaryStateChecks(1, 0), hbtest.ResolveChecks(ip), lookslike.MustCompile(map[string]interface{}{ "tcp": map[string]interface{}{ @@ -209,7 +209,7 @@ func TestCheckDown(t *testing.T) { hbtest.BaseChecks(ip, "down", "tcp"), hbtest.RespondingTCPChecks(), hbtest.SimpleURLChecks(t, "tcp", host, port), - hbtest.SummaryChecks(0, 1), + hbtest.SummaryStateChecks(0, 1), hbtest.ResolveChecks(ip), lookslike.MustCompile(map[string]interface{}{ "tcp": map[string]interface{}{ @@ -233,7 +233,7 @@ func TestNXDomainJob(t *testing.T) { t, lookslike.Strict(lookslike.Compose( hbtest.BaseChecks("", "down", "tcp"), - hbtest.SummaryChecks(0, 1), + hbtest.SummaryStateChecks(0, 1), hbtest.SimpleURLChecks(t, "tcp", host, port), hbtest.ErrorChecks(dialErr, "io"), )), diff --git a/heartbeat/monitors/active/tcp/tls_test.go b/heartbeat/monitors/active/tcp/tls_test.go index 0cf3110360bb..1a67a7945aea 100644 --- a/heartbeat/monitors/active/tcp/tls_test.go +++ b/heartbeat/monitors/active/tcp/tls_test.go @@ -57,7 +57,7 @@ func TestTLSSANIPConnection(t *testing.T) { hbtest.TLSChecks(0, 0, cert), hbtest.RespondingTCPChecks(), hbtest.BaseChecks(ip, "up", "tcp"), - hbtest.SummaryChecks(1, 0), + hbtest.SummaryStateChecks(1, 0), hbtest.SimpleURLChecks(t, "ssl", ip, port), )), event.Fields, @@ -77,7 +77,7 @@ func TestTLSHostname(t *testing.T) { hbtest.TLSChecks(0, 0, cert), hbtest.RespondingTCPChecks(), hbtest.BaseChecks(ip, "up", "tcp"), - hbtest.SummaryChecks(1, 0), + hbtest.SummaryStateChecks(1, 0), hbtest.SimpleURLChecks(t, "ssl", hostname, port), hbtest.ResolveChecks(ip), )), @@ -103,7 +103,7 @@ func TestTLSInvalidCert(t *testing.T) { lookslike.Strict(lookslike.Compose( hbtest.RespondingTCPChecks(), hbtest.BaseChecks(ip, "down", "tcp"), - hbtest.SummaryChecks(0, 1), + hbtest.SummaryStateChecks(0, 1), hbtest.SimpleURLChecks(t, "ssl", mismatchedHostname, port), hbtest.ResolveChecks(ip), lookslike.MustCompile(map[string]interface{}{ @@ -137,7 +137,7 @@ func TestTLSExpiredCert(t *testing.T) { lookslike.Strict(lookslike.Compose( hbtest.RespondingTCPChecks(), hbtest.BaseChecks(ip, "down", "tcp"), - hbtest.SummaryChecks(0, 1), + hbtest.SummaryStateChecks(0, 1), hbtest.SimpleURLChecks(t, "ssl", host, port), hbtest.ResolveChecks(ip), hbtest.ExpiredCertChecks(cert), diff --git a/heartbeat/monitors/jobs/job.go b/heartbeat/monitors/jobs/job.go index b24377fd28b2..70811478cb30 100644 --- a/heartbeat/monitors/jobs/job.go +++ b/heartbeat/monitors/jobs/job.go @@ -48,24 +48,6 @@ func WrapAll(jobs []Job, wrappers ...JobWrapper) []Job { return wrapped } -// JobWrapperFactory can be used to created new instances of JobWrappers. -type JobWrapperFactory func() JobWrapper - -// WrapAllSeparately wraps the given jobs using the given JobWrapperFactory instances. -// This enables us to use a different JobWrapper for the jobs passed in, but recursively apply -// the same wrapper to their children. -func WrapAllSeparately(jobs []Job, factories ...JobWrapperFactory) []Job { - var wrapped = make([]Job, 0, len(jobs)) - for _, j := range jobs { - for _, factory := range factories { - wrapper := factory() - j = Wrap(j, wrapper) - } - wrapped = append(wrapped, j) - } - return wrapped -} - // Wrap wraps the given Job and also any continuations with the given JobWrapper. func Wrap(job Job, wrapper JobWrapper) Job { return func(event *beat.Event) ([]Job, error) { diff --git a/heartbeat/monitors/jobs/testing.go b/heartbeat/monitors/jobs/testing.go index 2eed89a587d5..a28ca7a8002c 100644 --- a/heartbeat/monitors/jobs/testing.go +++ b/heartbeat/monitors/jobs/testing.go @@ -25,6 +25,7 @@ import ( // ExecJobsAndConts recursively executes multiple jobs. func ExecJobsAndConts(t *testing.T, jobs []Job) ([]*beat.Event, error) { + t.Helper() var results []*beat.Event for _, j := range jobs { resultEvents, err := ExecJobAndConts(t, j) @@ -39,21 +40,17 @@ func ExecJobsAndConts(t *testing.T, jobs []Job) ([]*beat.Event, error) { // ExecJobAndConts will recursively execute a job and gather its results func ExecJobAndConts(t *testing.T, j Job) ([]*beat.Event, error) { + t.Helper() var results []*beat.Event event := &beat.Event{} results = append(results, event) cont, err := j(event) - if err != nil { - return nil, err - } for _, cj := range cont { - cjResults, err := ExecJobAndConts(t, cj) - if err != nil { - return nil, err - } + var cjResults []*beat.Event + cjResults, err = ExecJobAndConts(t, cj) results = append(results, cjResults...) } - return results, nil + return results, err } diff --git a/heartbeat/monitors/mocks.go b/heartbeat/monitors/mocks.go index 9a5ba6a3b3ad..0a7227c9986e 100644 --- a/heartbeat/monitors/mocks.go +++ b/heartbeat/monitors/mocks.go @@ -197,7 +197,7 @@ func mockEventMonitorValidator(id string, name string) validator.Validator { return lookslike.Strict(lookslike.Compose( baseMockEventMonitorValidator(id, name, "up"), hbtestllext.MonitorTimespanValidator, - hbtest.SummaryChecks(1, 0), + hbtest.SummaryStateChecks(1, 0), lookslike.MustCompile(mockEventCustomFields()), )) } diff --git a/heartbeat/monitors/monitor.go b/heartbeat/monitors/monitor.go index 3d9823ea88ff..6a16c7d3f303 100644 --- a/heartbeat/monitors/monitor.go +++ b/heartbeat/monitors/monitor.go @@ -69,6 +69,8 @@ type Monitor struct { // stats is the countersRecorder used to record lifecycle events // for global metrics + telemetry stats plugin.RegistryRecorder + + monitorStateTracker *monitorstate.Tracker } // String prints a description of the monitor in a threadsafe way. It is important that this use threadsafe @@ -124,15 +126,16 @@ func newMonitorUnsafe( } m := &Monitor{ - stdFields: standardFields, - pluginName: pluginFactory.Name, - addTask: addTask, - configuredJobs: []*configuredJob{}, - pubClient: pubClient, - internalsMtx: sync.Mutex{}, - config: config, - stats: pluginFactory.Stats, - state: MON_INIT, + stdFields: standardFields, + pluginName: pluginFactory.Name, + addTask: addTask, + configuredJobs: []*configuredJob{}, + pubClient: pubClient, + internalsMtx: sync.Mutex{}, + config: config, + stats: pluginFactory.Stats, + state: MON_INIT, + monitorStateTracker: monitorstate.NewTracker(stateLoader, false), } if m.stdFields.ID == "" { @@ -178,7 +181,10 @@ func newMonitorUnsafe( // We need to use the lightweight wrapping for error jobs // since browser wrapping won't write summaries, but the fake job here is // effectively a lightweight job - wrappedJobs = wrappers.WrapLightweight(p.Jobs, m.stdFields, monitorstate.NewTracker(stateLoader, false)) + m.stdFields.BadConfig = true + // No need to retry bad configs + m.stdFields.MaxAttempts = 1 + wrappedJobs = wrappers.WrapCommon(p.Jobs, m.stdFields, stateLoader) } m.endpoints = p.Endpoints diff --git a/heartbeat/monitors/stdfields/stdfields.go b/heartbeat/monitors/stdfields/stdfields.go index d0773a49e798..ae721a746111 100644 --- a/heartbeat/monitors/stdfields/stdfields.go +++ b/heartbeat/monitors/stdfields/stdfields.go @@ -40,6 +40,7 @@ type StdMonitorFields struct { Service ServiceFields `config:"service"` Origin string `config:"origin"` LegacyServiceName string `config:"service_name"` + MaxAttempts uint16 `config:"max_attempts"` // Used by zip_url and local monitors // kibana originating monitors only run one journey at a time // and just use the `fields` syntax / manually set monitor IDs @@ -51,10 +52,13 @@ type StdMonitorFields struct { Local *config.C `config:"local"` } `config:"source"` RunFrom *hbconfig.LocationWithID `config:"run_from"` + // Set to true by monitor.go if monitor configuration is unrunnable + // Maybe there's a more elegant way to handle this + BadConfig bool } func ConfigToStdMonitorFields(conf *config.C) (StdMonitorFields, error) { - sFields := StdMonitorFields{Enabled: true} + sFields := StdMonitorFields{Enabled: true, MaxAttempts: 1} if err := conf.Unpack(&sFields); err != nil { return sFields, fmt.Errorf("error unpacking monitor plugin config: %w", err) diff --git a/heartbeat/monitors/task.go b/heartbeat/monitors/task.go index 038a6c5baa2f..ee0839fe14e5 100644 --- a/heartbeat/monitors/task.go +++ b/heartbeat/monitors/task.go @@ -84,8 +84,7 @@ func (t *configuredJob) Start(pubClient beat.Client) { return } - tf := t.makeSchedulerTaskFunc() //nolint:typecheck // this is used, linter just doesn't seem to see it - t.cancelFn, err = t.monitor.addTask(t.config.Schedule, t.monitor.stdFields.ID, tf, t.config.Type) + t.cancelFn, err = t.monitor.addTask(t.config.Schedule, t.monitor.stdFields.ID, t.makeSchedulerTaskFunc(), t.config.Type) if err != nil { logp.L().Info("could not start monitor: %v", err) } diff --git a/heartbeat/monitors/wrappers/monitorstate/esloader_test.go b/heartbeat/monitors/wrappers/monitorstate/esloader_test.go index a35ddcccb874..42b1c6c31c31 100644 --- a/heartbeat/monitors/wrappers/monitorstate/esloader_test.go +++ b/heartbeat/monitors/wrappers/monitorstate/esloader_test.go @@ -51,7 +51,7 @@ func TestStatesESLoader(t *testing.T) { monID := etc.createTestMonitorStateInES(t, testStatus) // Since we've continued this state it should register the initial state - ms := etc.tracker.getCurrentState(monID) + ms := etc.tracker.GetCurrentState(monID) require.True(t, ms.StartedAt.After(testStart.Add(-time.Nanosecond)), "timestamp for new state is off") requireMSStatusCount(t, ms, testStatus, 1) @@ -59,7 +59,7 @@ func TestStatesESLoader(t *testing.T) { count := FlappingThreshold * 2 var lastId string for i := 0; i < count; i++ { - ms = etc.tracker.RecordStatus(monID, testStatus) + ms = etc.tracker.RecordStatus(monID, testStatus, true) if i == 0 { lastId = ms.ID } @@ -77,7 +77,7 @@ func TestStatesESLoader(t *testing.T) { origMsId := ms.ID for i := 0; i < count; i++ { - ms = etc.tracker.RecordStatus(monID, testStatus) + ms = etc.tracker.RecordStatus(monID, testStatus, true) require.NotEqual(t, origMsId, ms.ID) if i == 0 { lastId = ms.ID diff --git a/heartbeat/monitors/wrappers/monitorstate/monitorstate.go b/heartbeat/monitors/wrappers/monitorstate/monitorstate.go index c39afd2df724..56034d7ab4fd 100644 --- a/heartbeat/monitors/wrappers/monitorstate/monitorstate.go +++ b/heartbeat/monitors/wrappers/monitorstate/monitorstate.go @@ -37,6 +37,8 @@ const ( StatusUp StateStatus = "up" StatusDown StateStatus = "down" StatusFlapping StateStatus = "flap" + // Nil, essentially + StatusEmpty StateStatus = "" ) func newMonitorState(sf stdfields.StdMonitorFields, status StateStatus, ctr int, flappingEnabled bool) *State { @@ -52,7 +54,7 @@ func newMonitorState(sf stdfields.StdMonitorFields, status StateStatus, ctr int, flappingEnabled: flappingEnabled, ctr: ctr + 1, } - ms.recordCheck(sf, status) + ms.recordCheck(sf, status, false) return ms } @@ -111,7 +113,7 @@ func (s *State) truncateFlapHistory() { // If the current state is continued it just updates counters and other record keeping, // if the state ends it actually swaps out the full value the state points to // and sets state.Ends. -func (s *State) recordCheck(sf stdfields.StdMonitorFields, newStatus StateStatus) { +func (s *State) recordCheck(sf stdfields.StdMonitorFields, newStatus StateStatus, isFinalAttempt bool) { if s.Status == StatusFlapping { s.truncateFlapHistory() @@ -124,14 +126,16 @@ func (s *State) recordCheck(sf stdfields.StdMonitorFields, newStatus StateStatus } } - if !hasStabilized { // continue flapping + if !hasStabilized || !isFinalAttempt { // continue flapping // Use the new flap history as part of the state s.FlapHistory = append(s.FlapHistory, newStatus) s.incrementCounters(newStatus) } else { // flap has ended s.transitionTo(sf, newStatus) } - } else if s.Status == newStatus { // stable state, status has not changed + // stable state, status has not changed + // or this will be retried, so no state change yet + } else if s.Status == newStatus || !isFinalAttempt { // The state is stable, no changes needed s.incrementCounters(newStatus) } else if s.Checks < FlappingThreshold && s.flappingEnabled { @@ -178,5 +182,6 @@ func LoaderDBKey(sf stdfields.StdMonitorFields, at time.Time, ctr int) string { rfid = normalizeRunFromIDRegexp.ReplaceAllString(sf.RunFrom.ID, "_") } - return fmt.Sprintf("%s-%x-%x", rfid, at.UnixMilli(), ctr) + key := fmt.Sprintf("%s-%x-%x", rfid, at.UnixMilli(), ctr) + return key } diff --git a/heartbeat/monitors/wrappers/monitorstate/monitorstate_test.go b/heartbeat/monitors/wrappers/monitorstate/monitorstate_test.go index f60ddb2a40f6..5a7f4f2d6e1b 100644 --- a/heartbeat/monitors/wrappers/monitorstate/monitorstate_test.go +++ b/heartbeat/monitors/wrappers/monitorstate/monitorstate_test.go @@ -44,16 +44,16 @@ func TestRecordingAndFlapping(t *testing.T) { require.Nil(t, ms.Ends, "expected nil ends after a stable series") // Since we're now in a stable state a single up check should create a new state from a stable one - ms.recordCheck(TestSf, StatusUp) + ms.recordCheck(TestSf, StatusUp, true) require.Equal(t, StatusUp, ms.Status) requireMSCounts(t, ms, 1, 0) } func TestDuration(t *testing.T) { ms := newMonitorState(TestSf, StatusUp, 0, true) - ms.recordCheck(TestSf, StatusUp) + ms.recordCheck(TestSf, StatusUp, true) time.Sleep(time.Millisecond * 10) - ms.recordCheck(TestSf, StatusUp) + ms.recordCheck(TestSf, StatusUp, true) // Pretty forgiving upper bound to account for flaky CI require.True(t, ms.DurationMs > 9 && ms.DurationMs < 900, "Expected duration to be ~10ms, got %d", ms.DurationMs) } @@ -62,9 +62,9 @@ func TestDuration(t *testing.T) { func recordFlappingSeries(TestSf stdfields.StdMonitorFields, ms *State) { for i := 0; i < FlappingThreshold; i++ { if i%2 == 0 { - ms.recordCheck(TestSf, StatusUp) + ms.recordCheck(TestSf, StatusUp, true) } else { - ms.recordCheck(TestSf, StatusDown) + ms.recordCheck(TestSf, StatusDown, true) } } } @@ -72,7 +72,7 @@ func recordFlappingSeries(TestSf stdfields.StdMonitorFields, ms *State) { // recordStableSeries is a test helper for repeatedly recording one status func recordStableSeries(TestSf stdfields.StdMonitorFields, ms *State, count int, s StateStatus) { for i := 0; i < count; i++ { - ms.recordCheck(TestSf, s) + ms.recordCheck(TestSf, s, true) } } diff --git a/heartbeat/monitors/wrappers/monitorstate/tracker.go b/heartbeat/monitors/wrappers/monitorstate/tracker.go index d3e9d7ff178c..03909d55aa83 100644 --- a/heartbeat/monitors/wrappers/monitorstate/tracker.go +++ b/heartbeat/monitors/wrappers/monitorstate/tracker.go @@ -56,25 +56,33 @@ type Tracker struct { // other than ES if necessary type StateLoader func(stdfields.StdMonitorFields) (*State, error) -func (t *Tracker) RecordStatus(sf stdfields.StdMonitorFields, newStatus StateStatus) (ms *State) { +func (t *Tracker) RecordStatus(sf stdfields.StdMonitorFields, newStatus StateStatus, isFinalAttempt bool) (ms *State) { //note: the return values have no concurrency controls, they may be unsafely read unless //copied to the stack, copying the structs before returning t.mtx.Lock() defer t.mtx.Unlock() - state := t.getCurrentState(sf) + state := t.GetCurrentState(sf) if state == nil { state = newMonitorState(sf, newStatus, 0, t.flappingEnabled) logp.L().Infof("initializing new state for monitor %s: %s", sf.ID, state.String()) t.states[sf.ID] = state } else { - state.recordCheck(sf, newStatus) + state.recordCheck(sf, newStatus, isFinalAttempt) } // return a copy since the state itself is a pointer that is frequently mutated return state.copy() } -func (t *Tracker) getCurrentState(sf stdfields.StdMonitorFields) (state *State) { +func (t *Tracker) GetCurrentStatus(sf stdfields.StdMonitorFields) StateStatus { + s := t.GetCurrentState(sf) + if s == nil { + return StatusEmpty + } + return s.Status +} + +func (t *Tracker) GetCurrentState(sf stdfields.StdMonitorFields) (state *State) { if state, ok := t.states[sf.ID]; ok { return state } diff --git a/heartbeat/monitors/wrappers/monitorstate/tracker_test.go b/heartbeat/monitors/wrappers/monitorstate/tracker_test.go index a1221ecc0723..ec1217b86150 100644 --- a/heartbeat/monitors/wrappers/monitorstate/tracker_test.go +++ b/heartbeat/monitors/wrappers/monitorstate/tracker_test.go @@ -28,43 +28,43 @@ import ( func TestTrackerRecord(t *testing.T) { mst := NewTracker(NilStateLoader, true) - ms := mst.RecordStatus(TestSf, StatusUp) + ms := mst.RecordStatus(TestSf, StatusUp, true) require.Equal(t, StatusUp, ms.Status) requireMSStatusCount(t, ms, StatusUp, 1) for i := 0; i < FlappingThreshold; i++ { - _ = mst.RecordStatus(TestSf, StatusDown) - ms = mst.RecordStatus(TestSf, StatusUp) + _ = mst.RecordStatus(TestSf, StatusDown, true) + ms = mst.RecordStatus(TestSf, StatusUp, true) } require.Equal(t, StatusFlapping, ms.Status) requireMSCounts(t, ms, FlappingThreshold+1, FlappingThreshold) // Restore stable state for i := 0; i < FlappingThreshold; i++ { - _ = mst.RecordStatus(TestSf, StatusDown) + _ = mst.RecordStatus(TestSf, StatusDown, true) } - ms = mst.RecordStatus(TestSf, StatusDown) + ms = mst.RecordStatus(TestSf, StatusDown, true) require.Equal(t, StatusDown, ms.Status) requireMSStatusCount(t, ms, StatusDown, FlappingThreshold-1) } func TestTrackerRecordFlappingDisabled(t *testing.T) { mst := NewTracker(NilStateLoader, false) - ms := mst.RecordStatus(TestSf, StatusUp) + ms := mst.RecordStatus(TestSf, StatusUp, true) require.Equal(t, StatusUp, ms.Status) requireMSStatusCount(t, ms, StatusUp, 1) for i := 0; i < FlappingThreshold; i++ { - _ = mst.RecordStatus(TestSf, StatusDown) - ms = mst.RecordStatus(TestSf, StatusUp) + _ = mst.RecordStatus(TestSf, StatusDown, true) + ms = mst.RecordStatus(TestSf, StatusUp, true) } // with flapping disabled it only shows as up require.Equal(t, StatusUp, ms.Status) requireMSCounts(t, ms, 1, 0) - ms = mst.RecordStatus(TestSf, StatusDown) + ms = mst.RecordStatus(TestSf, StatusDown, true) require.Equal(t, StatusDown, ms.Status) requireMSStatusCount(t, ms, StatusDown, 1) } diff --git a/heartbeat/monitors/wrappers/summarizer/summarizer.go b/heartbeat/monitors/wrappers/summarizer/summarizer.go new file mode 100644 index 000000000000..49d3ca9422ad --- /dev/null +++ b/heartbeat/monitors/wrappers/summarizer/summarizer.go @@ -0,0 +1,167 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package summarizer + +import ( + "fmt" + "sync" + "time" + + "github.com/gofrs/uuid" + + "github.com/elastic/beats/v7/heartbeat/eventext" + "github.com/elastic/beats/v7/heartbeat/monitors/jobs" + "github.com/elastic/beats/v7/heartbeat/monitors/stdfields" + "github.com/elastic/beats/v7/heartbeat/monitors/wrappers/monitorstate" + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent-libs/mapstr" +) + +type Summarizer struct { + rootJob jobs.Job + contsRemaining uint16 + mtx *sync.Mutex + jobSummary *JobSummary + checkGroup string + stateTracker *monitorstate.Tracker + sf stdfields.StdMonitorFields + retryDelay time.Duration +} + +type JobSummary struct { + Attempt uint16 `json:"attempt"` + MaxAttempts uint16 `json:"max_attempts"` + FinalAttempt bool `json:"final_attempt"` + Up uint16 `json:"up"` + Down uint16 `json:"down"` + Status monitorstate.StateStatus `json:"status"` + RetryGroup string `json:"retry_group"` +} + +func NewSummarizer(rootJob jobs.Job, sf stdfields.StdMonitorFields, mst *monitorstate.Tracker) *Summarizer { + uu, err := uuid.NewV1() + if err != nil { + logp.L().Errorf("could not create v1 UUID for retry group: %s", err) + } + return &Summarizer{ + rootJob: rootJob, + contsRemaining: 1, + mtx: &sync.Mutex{}, + jobSummary: NewJobSummary(1, sf.MaxAttempts, uu.String()), + checkGroup: uu.String(), + stateTracker: mst, + sf: sf, + // private property, but can be overridden in tests to speed them up + retryDelay: time.Second, + } +} + +func NewJobSummary(attempt uint16, maxAttempts uint16, retryGroup string) *JobSummary { + if maxAttempts < 1 { + maxAttempts = 1 + } + + return &JobSummary{ + MaxAttempts: maxAttempts, + Attempt: attempt, + RetryGroup: retryGroup, + } +} + +// Wrap wraps the given job in such a way that the last event summarizes all previous events +// and additionally adds some common fields like monitor.check_group to all events. +// This adds the state and summary top level fields. +func (s *Summarizer) Wrap(j jobs.Job) jobs.Job { + return func(event *beat.Event) ([]jobs.Job, error) { + conts, jobErr := j(event) + + _, _ = event.PutValue("monitor.check_group", fmt.Sprintf("%s-%d", s.checkGroup, s.jobSummary.Attempt)) + + s.mtx.Lock() + defer s.mtx.Unlock() + + js := s.jobSummary + + s.contsRemaining-- // we just ran one cont, discount it + // these many still need to be processed + s.contsRemaining += uint16(len(conts)) + + monitorStatus, err := event.GetValue("monitor.status") + if err == nil && !eventext.IsEventCancelled(event) { // if this event contains a status... + mss := monitorstate.StateStatus(monitorStatus.(string)) + + if mss == monitorstate.StatusUp { + js.Up++ + } else { + js.Down++ + } + } + + if s.contsRemaining == 0 { + if js.Down > 0 { + js.Status = monitorstate.StatusDown + } else { + js.Status = monitorstate.StatusUp + } + + // Get the last status of this monitor, we use this later to + // determine if a retry is needed + lastStatus := s.stateTracker.GetCurrentStatus(s.sf) + + // FinalAttempt is true if no retries will occur + js.FinalAttempt = js.Status != monitorstate.StatusDown || js.Attempt >= js.MaxAttempts + + ms := s.stateTracker.RecordStatus(s.sf, js.Status, js.FinalAttempt) + + eventext.MergeEventFields(event, mapstr.M{ + "summary": js, + "state": ms, + }) + + logp.L().Debugf("attempt info: %v == %v && %d < %d", js.Status, lastStatus, js.Attempt, js.MaxAttempts) + if !js.FinalAttempt { + // Reset the job summary for the next attempt + // We preserve `s` across attempts + s.jobSummary = NewJobSummary(js.Attempt+1, js.MaxAttempts, js.RetryGroup) + s.contsRemaining = 1 + + // Delay retries by 1s for two reasons: + // 1. Since ES timestamps are millisecond resolution they can happen so fast + // that it's hard to tell the sequence in which jobs executed apart in our + // kibana queries + // 2. If the site error is very short 1s gives it a tiny bit of time to recover + delayedRootJob := jobs.Wrap(s.rootJob, func(j jobs.Job) jobs.Job { + return func(event *beat.Event) ([]jobs.Job, error) { + time.Sleep(s.retryDelay) + return j(event) + } + }) + conts = []jobs.Job{delayedRootJob} + } + } + + // Wrap downstream jobs using the same state object this lets us create new state + // on the first job, but re-use that same object on continuations. + for i, cont := range conts { + conts[i] = s.Wrap(cont) + } + + return conts, jobErr + } +} diff --git a/heartbeat/monitors/wrappers/summarizer/summarizer_test.go b/heartbeat/monitors/wrappers/summarizer/summarizer_test.go new file mode 100644 index 000000000000..de86cd7b49a7 --- /dev/null +++ b/heartbeat/monitors/wrappers/summarizer/summarizer_test.go @@ -0,0 +1,181 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package summarizer + +import ( + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/elastic/beats/v7/heartbeat/monitors/jobs" + "github.com/elastic/beats/v7/heartbeat/monitors/stdfields" + "github.com/elastic/beats/v7/heartbeat/monitors/wrappers/monitorstate" + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/elastic-agent-libs/mapstr" +) + +func TestSummarizer(t *testing.T) { + t.Parallel() + charToStatus := func(c uint8) monitorstate.StateStatus { + if c == 'u' { + return monitorstate.StatusUp + } else { + return monitorstate.StatusDown + } + } + + // these tests use strings to describe sequences of events + tests := []struct { + name string + maxAttempts int + // The sequence of up down states the monitor should emit + // Equivalent to monitor.status + statusSequence string + // The expected states on each event + expectedStates string + // the attempt number of the given event + expectedAttempts string + }{ + { + "start down, transition to up", + 2, + "du", + "du", + "12", + }, + { + "start up, stay up", + 2, + "uuuuuuuu", + "uuuuuuuu", + "11111111", + }, + { + "start down, stay down", + 2, + "dddddddd", + "dddddddd", + "12121212", + }, + { + "start up - go down with one retry - thenrecover", + 2, + "udddduuu", + "uuddduuu", + "11212111", + }, + { + "start up, transient down, recover", + 2, + "uuuduuuu", + "uuuuuuuu", + "11112111", + }, + { + "start up, multiple transient down, recover", + 2, + "uuudududu", + "uuuuuuuuu", + "111121212", + }, + { + "no retries, single down", + 1, + "uuuduuuu", + "uuuduuuu", + "11111111", + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + dummyErr := fmt.Errorf("dummyerr") + + // The job runs through each char in the status sequence and + // returns an error if it's set to 'd' + pos := 0 + job := func(event *beat.Event) (j []jobs.Job, retErr error) { + status := charToStatus(tt.statusSequence[pos]) + if status == monitorstate.StatusDown { + retErr = dummyErr + } + event.Fields = mapstr.M{ + "monitor": mapstr.M{ + "id": "test", + "status": string(status), + }, + } + + pos++ + return nil, retErr + } + + tracker := monitorstate.NewTracker(monitorstate.NilStateLoader, false) + sf := stdfields.StdMonitorFields{ID: "testmon", Name: "testmon", MaxAttempts: uint16(tt.maxAttempts)} + + rcvdStatuses := "" + rcvdStates := "" + rcvdAttempts := "" + i := 0 + var lastSummary *JobSummary + for { + s := NewSummarizer(job, sf, tracker) + // Shorten retry delay to make tests run faster + s.retryDelay = 2 * time.Millisecond + wrapped := s.Wrap(job) + events, _ := jobs.ExecJobAndConts(t, wrapped) + for _, event := range events { + eventStatus, _ := event.GetValue("monitor.status") + eventStatusStr := eventStatus.(string) + rcvdStatuses += eventStatusStr[:1] + state, _ := event.GetValue("state") + if state != nil { + rcvdStates += string(state.(*monitorstate.State).Status)[:1] + } else { + rcvdStates += "_" + } + summaryIface, _ := event.GetValue("summary") + summary := summaryIface.(*JobSummary) + + if summary == nil { + rcvdAttempts += "!" + } else if lastSummary != nil { + if summary.Attempt > 1 { + require.Equal(t, lastSummary.RetryGroup, summary.RetryGroup) + } else { + require.NotEqual(t, lastSummary.RetryGroup, summary.RetryGroup) + } + } + rcvdAttempts += fmt.Sprintf("%d", summary.Attempt) + lastSummary = summary + } + i += len(events) + if i >= len(tt.statusSequence) { + break + } + } + require.Equal(t, tt.statusSequence, rcvdStatuses) + require.Equal(t, tt.expectedStates, rcvdStates) + require.Equal(t, tt.expectedAttempts, rcvdAttempts) + }) + } +} diff --git a/heartbeat/monitors/wrappers/summarizer/summarizertesthelper/testhelper.go b/heartbeat/monitors/wrappers/summarizer/summarizertesthelper/testhelper.go new file mode 100644 index 000000000000..def27bde0b0e --- /dev/null +++ b/heartbeat/monitors/wrappers/summarizer/summarizertesthelper/testhelper.go @@ -0,0 +1,56 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package summarizertesthelper + +// summarizertest exists to provide a helper function +// for the summarizer. We need a separate package to +// prevent import cycles. + +import ( + "fmt" + + "github.com/elastic/beats/v7/heartbeat/monitors/wrappers/summarizer" + "github.com/elastic/go-lookslike" + "github.com/elastic/go-lookslike/isdef" + "github.com/elastic/go-lookslike/llpath" + "github.com/elastic/go-lookslike/llresult" + "github.com/elastic/go-lookslike/validator" +) + +// This duplicates hbtest.SummaryChecks to avoid an import cycle. +// It could be refactored out, but it just isn't worth it. +func SummaryValidator(up uint16, down uint16) validator.Validator { + return lookslike.MustCompile(map[string]interface{}{ + "summary": summaryIsdef(up, down), + }) +} + +func summaryIsdef(up uint16, down uint16) isdef.IsDef { + return isdef.Is("summary", func(path llpath.Path, v interface{}) *llresult.Results { + js, ok := v.(summarizer.JobSummary) + if !ok { + return llresult.SimpleResult(path, false, fmt.Sprintf("expected a *JobSummary, got %v", v)) + } + + if js.Up != up || js.Down != down { + return llresult.SimpleResult(path, false, fmt.Sprintf("expected up/down to be %d/%d, got %d/%d", up, down, js.Up, js.Down)) + } + + return llresult.ValidResult(path) + }) +} diff --git a/heartbeat/monitors/wrappers/wrappers.go b/heartbeat/monitors/wrappers/wrappers.go index b4ba0c344e5c..233effa0acec 100644 --- a/heartbeat/monitors/wrappers/wrappers.go +++ b/heartbeat/monitors/wrappers/wrappers.go @@ -20,10 +20,8 @@ package wrappers import ( "errors" "fmt" - "sync" "time" - "github.com/gofrs/uuid" "github.com/mitchellh/hashstructure" "github.com/elastic/elastic-agent-libs/logp" @@ -36,42 +34,44 @@ import ( "github.com/elastic/beats/v7/heartbeat/monitors/logger" "github.com/elastic/beats/v7/heartbeat/monitors/stdfields" "github.com/elastic/beats/v7/heartbeat/monitors/wrappers/monitorstate" + "github.com/elastic/beats/v7/heartbeat/monitors/wrappers/summarizer" "github.com/elastic/beats/v7/heartbeat/scheduler/schedule" "github.com/elastic/beats/v7/libbeat/beat" ) // WrapCommon applies the common wrappers that all monitor jobs get. func WrapCommon(js []jobs.Job, stdMonFields stdfields.StdMonitorFields, stateLoader monitorstate.StateLoader) []jobs.Job { - // flapping is disabled by default until we sort out how it should work mst := monitorstate.NewTracker(stateLoader, false) - if stdMonFields.Type == "browser" { - return WrapBrowser(js, stdMonFields, mst) + var wrapped []jobs.Job + if stdMonFields.Type != "browser" || stdMonFields.BadConfig { + wrapped = WrapLightweight(js, stdMonFields, mst) } else { - return WrapLightweight(js, stdMonFields, mst) + wrapped = WrapBrowser(js, stdMonFields, mst) } + // Wrap just the root jobs with the summarizer + // The summarizer itself wraps the continuations in a stateful way + for i, j := range wrapped { + j := j + wrapped[i] = func(event *beat.Event) ([]jobs.Job, error) { + s := summarizer.NewSummarizer(j, stdMonFields, mst) + return s.Wrap(j)(event) + } + } + return wrapped } // WrapLightweight applies to http/tcp/icmp, everything but journeys involving node func WrapLightweight(js []jobs.Job, stdMonFields stdfields.StdMonitorFields, mst *monitorstate.Tracker) []jobs.Job { - return jobs.WrapAllSeparately( - jobs.WrapAll( - js, - addMonitorTimespan(stdMonFields), - addServiceName(stdMonFields), - addMonitorMeta(stdMonFields, len(js) > 1), - addMonitorStatus(nil), - addMonitorErr, - addMonitorDuration, - logMonitorRun(nil), - ), - func() jobs.JobWrapper { - return makeAddSummary() - }, - func() jobs.JobWrapper { - return addMonitorState(stdMonFields, mst) - }, + return jobs.WrapAll( + js, + addMonitorTimespan(stdMonFields), + addServiceName(stdMonFields), + addMonitorMeta(stdMonFields, len(js) > 1), + addMonitorStatus(nil), + addMonitorErr, + addMonitorDuration, + logMonitorRun(nil), ) - } // WrapBrowser is pretty minimal in terms of fields added. The browser monitor @@ -85,37 +85,10 @@ func WrapBrowser(js []jobs.Job, stdMonFields stdfields.StdMonitorFields, mst *mo addMonitorMeta(stdMonFields, false), addMonitorStatus(byEventType("heartbeat/summary")), addMonitorErr, - addBrowserSummary(stdMonFields, byEventType("heartbeat/summary")), - addMonitorState(stdMonFields, mst), logMonitorRun(byEventType("heartbeat/summary")), ) } -// addMonitorState computes the various state fields -func addMonitorState(sf stdfields.StdMonitorFields, mst *monitorstate.Tracker) jobs.JobWrapper { - return func(job jobs.Job) jobs.Job { - return func(event *beat.Event) ([]jobs.Job, error) { - cont, err := job(event) - - hasSummary, _ := event.Fields.HasKey("summary.up") - if !hasSummary { - return cont, err - } - - status, err := event.GetValue("monitor.status") - if err != nil { - return nil, fmt.Errorf("could not wrap state for '%s', no status assigned: %w", sf.ID, err) - } - - ms := mst.RecordStatus(sf, monitorstate.StateStatus(status.(string))) - - eventext.MergeEventFields(event, mapstr.M{"state": ms}) - - return cont, nil - } - } -} - // addMonitorMeta adds the id, name, and type fields to the monitor. func addMonitorMeta(sFields stdfields.StdMonitorFields, hashURLIntoID bool) jobs.JobWrapper { return func(job jobs.Job) jobs.Job { @@ -279,113 +252,6 @@ func logMonitorRun(match EventMatcher) jobs.JobWrapper { } } -// makeAddSummary summarizes the job, adding the `summary` field to the last event emitted. -func makeAddSummary() jobs.JobWrapper { - // This is a tricky method. The way this works is that we track the state across jobs in the - // state struct here. - state := struct { - mtx sync.Mutex - monitorId string - remaining uint16 - up uint16 - down uint16 - checkGroup string - generation uint64 - }{ - mtx: sync.Mutex{}, - } - // Note this is not threadsafe, must be called from a mutex - resetState := func() { - state.remaining = 1 - state.up = 0 - state.down = 0 - state.generation++ - u, err := uuid.NewV1() - if err != nil { - panic(fmt.Sprintf("cannot generate UUIDs on this system: %s", err)) - } - state.checkGroup = u.String() - } - resetState() - - return func(job jobs.Job) jobs.Job { - return func(event *beat.Event) ([]jobs.Job, error) { - cont, jobErr := job(event) - state.mtx.Lock() - defer state.mtx.Unlock() - - // If the event is cancelled we don't record it as being either up or down since - // we discard the event anyway. - var eventStatus interface{} - if !eventext.IsEventCancelled(event) { - // After each job - eventStatus, _ = event.GetValue("monitor.status") - if eventStatus == "up" { - state.up++ - } else { - state.down++ - } - } - - _, _ = event.PutValue("monitor.check_group", state.checkGroup) - - // Adjust the total remaining to account for new continuations - state.remaining += uint16(len(cont)) - // Reduce total remaining to account for the just executed job - state.remaining-- - - // After last job - if state.remaining == 0 { - up := state.up - down := state.down - - eventext.MergeEventFields(event, mapstr.M{ - "summary": mapstr.M{ - "up": up, - "down": down, - }, - }) - resetState() - } - - return cont, jobErr - } - } -} - -type EventMatcher func(event *beat.Event) bool - -func addBrowserSummary(sf stdfields.StdMonitorFields, match EventMatcher) jobs.JobWrapper { - return func(job jobs.Job) jobs.Job { - return func(event *beat.Event) ([]jobs.Job, error) { - cont, jobErr := job(event) - - if match != nil && !match(event) { - return cont, jobErr - } - - status, err := event.GetValue("monitor.status") - if err != nil { - return nil, fmt.Errorf("could not wrap summary for '%s', no status assigned: %w", sf.ID, err) - } - - up, down := 1, 0 - if monitorstate.StateStatus(status.(string)) == monitorstate.StatusDown { - up, down = 0, 1 - } - - eventext.MergeEventFields(event, mapstr.M{ - "summary": mapstr.M{ - "up": up, - "down": down, - }, - }) - - return cont, jobErr - } - } -} - func byEventType(t string) func(event *beat.Event) bool { return func(event *beat.Event) bool { eventType, err := event.Fields.GetValue("event.type") @@ -396,3 +262,5 @@ func byEventType(t string) func(event *beat.Event) bool { return eventType == t } } + +type EventMatcher func(event *beat.Event) bool diff --git a/heartbeat/monitors/wrappers/wrappers_test.go b/heartbeat/monitors/wrappers/wrappers_test.go index ff1acd84b350..4ebc653d8fc8 100644 --- a/heartbeat/monitors/wrappers/wrappers_test.go +++ b/heartbeat/monitors/wrappers/wrappers_test.go @@ -43,6 +43,9 @@ import ( "github.com/elastic/beats/v7/heartbeat/monitors/jobs" "github.com/elastic/beats/v7/heartbeat/monitors/logger" "github.com/elastic/beats/v7/heartbeat/monitors/stdfields" + "github.com/elastic/beats/v7/heartbeat/monitors/wrappers/monitorstate" + "github.com/elastic/beats/v7/heartbeat/monitors/wrappers/summarizer" + "github.com/elastic/beats/v7/heartbeat/monitors/wrappers/summarizer/summarizertesthelper" "github.com/elastic/beats/v7/heartbeat/scheduler/schedule" "github.com/elastic/beats/v7/libbeat/beat" ) @@ -57,11 +60,12 @@ type testDef struct { } var testMonFields = stdfields.StdMonitorFields{ - ID: "myid", - Name: "myname", - Type: "mytype", - Schedule: schedule.MustParse("@every 1s"), - Timeout: 1, + ID: "myid", + Name: "myname", + Type: "mytype", + Schedule: schedule.MustParse("@every 1s"), + Timeout: 1, + MaxAttempts: 1, } var testBrowserMonFields = stdfields.StdMonitorFields{ @@ -71,6 +75,7 @@ var testBrowserMonFields = stdfields.StdMonitorFields{ } func testCommonWrap(t *testing.T, tt testDef) { + t.Helper() t.Run(tt.name, func(t *testing.T) { wrapped := WrapCommon(tt.jobs, tt.sFields, nil) @@ -82,7 +87,7 @@ func testCommonWrap(t *testing.T, tt testDef) { results, err := jobs.ExecJobsAndConts(t, wrapped) assert.NoError(t, err) - require.Equal(t, len(results), len(tt.want), "Expected test def wants to correspond exactly to number results.") + assert.Len(t, results, len(tt.want), "Expected test def wants to correspond exactly to number results.") for idx, r := range results { t.Run(fmt.Sprintf("result at index %d", idx), func(t *testing.T) { @@ -123,7 +128,7 @@ func TestSimpleJob(t *testing.T) { }), hbtestllext.MonitorTimespanValidator, stateValidator(), - summaryValidator(1, 0), + summarizertesthelper.SummaryValidator(1, 0), )}, nil, func(t *testing.T, results []*beat.Event, observed []observer.LoggedEntry) { @@ -201,7 +206,7 @@ func TestAdditionalStdFields(t *testing.T) { }), stateValidator(), hbtestllext.MonitorTimespanValidator, - summaryValidator(1, 0), + summarizertesthelper.SummaryValidator(1, 0), )}, nil, nil, @@ -239,7 +244,7 @@ func TestErrorJob(t *testing.T) { lookslike.Compose( errorJobValidator, hbtestllext.MonitorTimespanValidator, - summaryValidator(0, 1), + summarizertesthelper.SummaryValidator(0, 1), )}, nil, nil, @@ -264,7 +269,7 @@ func TestMultiJobNoConts(t *testing.T) { }), stateValidator(), hbtestllext.MonitorTimespanValidator, - summaryValidator(1, 0), + summarizertesthelper.SummaryValidator(1, 0), ) } @@ -319,17 +324,20 @@ func TestMultiJobConts(t *testing.T) { testCommonWrap(t, testDef{ "multi-job-continuations", testMonFields, - []jobs.Job{makeContJob(t, "http://foo.com"), makeContJob(t, "http://bar.com")}, + []jobs.Job{ + makeContJob(t, "http://foo.com"), + makeContJob(t, "http://bar.com"), + }, []validator.Validator{ contJobValidator("http://foo.com", "1st"), lookslike.Compose( contJobValidator("http://foo.com", "2nd"), - summaryValidator(2, 0), + summarizertesthelper.SummaryValidator(2, 0), ), contJobValidator("http://bar.com", "1st"), lookslike.Compose( contJobValidator("http://bar.com", "2nd"), - summaryValidator(2, 0), + summarizertesthelper.SummaryValidator(2, 0), ), }, nil, @@ -337,6 +345,132 @@ func TestMultiJobConts(t *testing.T) { }) } +func TestRetryMultiCont(t *testing.T) { + uniqScope := isdef.ScopedIsUnique() + + expected := []struct { + monStatus string + js summarizer.JobSummary + state monitorstate.State + }{ + { + "down", + summarizer.JobSummary{ + Status: "down", + FinalAttempt: true, + // we expect two up since this is a lightweight + // job and all events get a monitor status + // since no errors are returned that's 2 + Up: 0, + Down: 2, + Attempt: 1, + MaxAttempts: 2, + }, + monitorstate.State{ + Status: "down", + Up: 0, + Down: 2, + Checks: 2, + }, + }, + { + "down", + summarizer.JobSummary{ + Status: "down", + FinalAttempt: true, + Up: 0, + Down: 2, + Attempt: 2, + MaxAttempts: 2, + }, + monitorstate.State{ + Status: "down", + Up: 0, + Down: 2, + Checks: 2, + }, + }, + } + + jobErr := fmt.Errorf("down") + + makeContJob := func(t *testing.T, u string) jobs.Job { + expIdx := 0 + return func(event *beat.Event) ([]jobs.Job, error) { + eventext.MergeEventFields(event, mapstr.M{"cont": "1st"}) + u, err := url.Parse(u) + require.NoError(t, err) + eventext.MergeEventFields(event, mapstr.M{"url": URLFields(u)}) + + return []jobs.Job{ + func(event *beat.Event) ([]jobs.Job, error) { + eventext.MergeEventFields(event, mapstr.M{"cont": "2nd"}) + eventext.MergeEventFields(event, mapstr.M{"url": URLFields(u)}) + + expIdx++ + if expIdx >= len(expected)-1 { + expIdx = 0 + } + exp := expected[expIdx] + if exp.js.Status == "down" { + return nil, jobErr + } + + return nil, nil + }, + }, jobErr + } + } + + contJobValidator := func(u string, msg string) validator.Validator { + return lookslike.Compose( + urlValidator(t, u), + lookslike.MustCompile(map[string]interface{}{"cont": msg}), + lookslike.MustCompile(map[string]interface{}{ + "error": map[string]interface{}{ + "message": isdef.IsString, + "type": isdef.IsString, + }, + "monitor": map[string]interface{}{ + "duration.us": hbtestllext.IsInt64, + "id": uniqScope.IsUniqueTo(u), + "name": testMonFields.Name, + "type": testMonFields.Type, + "status": "down", + "check_group": uniqScope.IsUniqueTo(u), + }, + "state": isdef.Optional(hbtestllext.IsMonitorState), + }), + hbtestllext.MonitorTimespanValidator, + ) + } + + retryMonFields := testMonFields + retryMonFields.MaxAttempts = 2 + + for _, expected := range expected { + testCommonWrap(t, testDef{ + "multi-job-continuations-retry", + retryMonFields, + []jobs.Job{makeContJob(t, "http://foo.com")}, + []validator.Validator{ + contJobValidator("http://foo.com", "1st"), + lookslike.Compose( + contJobValidator("http://foo.com", "2nd"), + summarizertesthelper.SummaryValidator(expected.js.Up, expected.js.Down), + ), + contJobValidator("http://foo.com", "1st"), + lookslike.Compose( + contJobValidator("http://foo.com", "2nd"), + summarizertesthelper.SummaryValidator(expected.js.Up, expected.js.Down), + ), + }, + nil, + nil, + }) + } +} + func TestMultiJobContsCancelledEvents(t *testing.T) { uniqScope := isdef.ScopedIsUnique() @@ -387,14 +521,14 @@ func TestMultiJobContsCancelledEvents(t *testing.T) { ), lookslike.Compose( contJobValidator("http://foo.com", "2nd"), - summaryValidator(1, 0), + summarizertesthelper.SummaryValidator(1, 0), ), lookslike.Compose( contJobValidator("http://bar.com", "1st"), ), lookslike.Compose( contJobValidator("http://bar.com", "2nd"), - summaryValidator(1, 0), + summarizertesthelper.SummaryValidator(1, 0), ), }, []validator.Validator{ @@ -428,17 +562,6 @@ func stateValidator() validator.Validator { }) } -// This duplicates hbtest.SummaryChecks to avoid an import cycle. -// It could be refactored out, but it just isn't worth it. -func summaryValidator(up int, down int) validator.Validator { - return lookslike.MustCompile(map[string]interface{}{ - "summary": map[string]interface{}{ - "up": uint16(up), - "down": uint16(down), - }, - }) -} - func TestTimespan(t *testing.T) { now := time.Now() sched10s, err := schedule.Parse("@every 10s") @@ -485,10 +608,6 @@ type BrowserMonitor struct { name string checkGroup string durationMs int64 - // Used for testing legacy zip_url and local monitors - // where the top-level id/name are used to populate monitor.project - legacyProjectId string - legacyProjectName string } var inlineMonitorValues = BrowserMonitor{ @@ -504,16 +623,14 @@ func makeInlineBrowserJob(t *testing.T, u string) jobs.Job { eventext.MergeEventFields(event, mapstr.M{ "url": URLFields(parsed), "monitor": mapstr.M{ - "type": "browser", - "check_group": inlineMonitorValues.checkGroup, + "type": "browser", + "status": "up", }, }) return nil, nil } } -// Browser inline jobs monitor information should not be altered -// by the wrappers as they are handled separately in synth enricher func TestInlineBrowserJob(t *testing.T) { sFields := testBrowserMonFields sFields.ID = inlineMonitorValues.id @@ -527,13 +644,16 @@ func TestInlineBrowserJob(t *testing.T) { lookslike.Compose( urlValidator(t, "http://foo.com"), lookslike.MustCompile(map[string]interface{}{ + "state": isdef.Optional(hbtestllext.IsMonitorState), "monitor": map[string]interface{}{ "type": "browser", "id": inlineMonitorValues.id, "name": inlineMonitorValues.name, - "check_group": inlineMonitorValues.checkGroup, + "check_group": isdef.IsString, + "status": "up", }, }), + summarizertesthelper.SummaryValidator(1, 0), hbtestllext.MonitorTimespanValidator, ), ), @@ -550,15 +670,6 @@ var projectMonitorValues = BrowserMonitor{ durationMs: time.Second.Microseconds(), } -// Used for testing legacy zip_url / local monitorss -var legacyProjectMonitorValues = BrowserMonitor{ - id: "journey-1", - name: "Journey 1", - checkGroup: "acheckgroup", - legacyProjectId: "my-project", - legacyProjectName: "My Project", -} - func makeProjectBrowserJob(t *testing.T, u string, summary bool, projectErr error, bm BrowserMonitor) jobs.Job { parsed, err := url.Parse(u) require.NoError(t, err) @@ -567,11 +678,11 @@ func makeProjectBrowserJob(t *testing.T, u string, summary bool, projectErr erro eventext.MergeEventFields(event, mapstr.M{ "url": URLFields(parsed), "monitor": mapstr.M{ - "type": "browser", - "id": bm.id, - "name": bm.name, - "check_group": bm.checkGroup, - "duration": mapstr.M{"us": bm.durationMs}, + "type": "browser", + "id": bm.id, + "name": bm.name, + "status": "up", + "duration": mapstr.M{"us": bm.durationMs}, }, }) if summary { @@ -621,11 +732,12 @@ func TestProjectBrowserJob(t *testing.T) { "name": projectMonitorValues.name, "duration": mapstr.M{"us": time.Second.Microseconds()}, "origin": "my-origin", - "check_group": projectMonitorValues.checkGroup, + "check_group": isdef.IsString, "timespan": mapstr.M{ "gte": hbtestllext.IsTime, "lt": hbtestllext.IsTime, }, + "status": isdef.IsString, }, "url": URLFields(urlU), }), @@ -638,6 +750,7 @@ func TestProjectBrowserJob(t *testing.T) { []validator.Validator{ lookslike.Strict( lookslike.Compose( + summarizertesthelper.SummaryValidator(1, 0), urlValidator(t, urlStr), expectedMonFields, ))}, @@ -653,9 +766,9 @@ func TestProjectBrowserJob(t *testing.T) { lookslike.Compose( urlValidator(t, urlStr), expectedMonFields, + summarizertesthelper.SummaryValidator(1, 0), lookslike.MustCompile(map[string]interface{}{ "monitor": map[string]interface{}{"status": "up"}, - "summary": map[string]interface{}{"up": 1, "down": 0}, "event": map[string]interface{}{ "type": "heartbeat/summary", }, @@ -673,9 +786,9 @@ func TestProjectBrowserJob(t *testing.T) { lookslike.Compose( urlValidator(t, urlStr), expectedMonFields, + summarizertesthelper.SummaryValidator(0, 1), lookslike.MustCompile(map[string]interface{}{ "monitor": map[string]interface{}{"status": "down"}, - "summary": map[string]interface{}{"up": 0, "down": 1}, "error": map[string]interface{}{ "type": isdef.IsString, "message": "testerr", @@ -688,40 +801,6 @@ func TestProjectBrowserJob(t *testing.T) { nil, browserLogValidator(projectMonitorValues.id, time.Second.Microseconds(), 2, "down"), }) - - legacySFields := testBrowserMonFields - legacySFields.ID = legacyProjectMonitorValues.legacyProjectId - legacySFields.Name = legacyProjectMonitorValues.legacyProjectName - legacySFields.IsLegacyBrowserSource = true - - expectedLegacyMonFields := lookslike.MustCompile(map[string]interface{}{ - "monitor": map[string]interface{}{ - "type": "browser", - "id": legacyProjectMonitorValues.legacyProjectId, - "name": legacyProjectMonitorValues.legacyProjectName, - "duration": mapstr.M{"us": int64(0)}, - "check_group": legacyProjectMonitorValues.checkGroup, - "timespan": mapstr.M{ - "gte": hbtestllext.IsTime, - "lt": hbtestllext.IsTime, - }, - }, - "url": URLFields(urlU), - }) - - testCommonWrap(t, testDef{ - "legacy", // has no summary fields! - legacySFields, - []jobs.Job{makeProjectBrowserJob(t, urlStr, false, nil, legacyProjectMonitorValues)}, - []validator.Validator{ - lookslike.Strict( - lookslike.Compose( - urlValidator(t, urlStr), - expectedLegacyMonFields, - ))}, - nil, - nil, - }) } func TestECSErrors(t *testing.T) { diff --git a/heartbeat/tracer/tracer_test.go b/heartbeat/tracer/tracer_test.go index fc7d22762484..87953d5de5ae 100644 --- a/heartbeat/tracer/tracer_test.go +++ b/heartbeat/tracer/tracer_test.go @@ -123,7 +123,8 @@ func listenTilClosed(t *testing.T, sockPath string) []string { conn, err := listener.Accept() require.NoError(t, err) - var received []string + // no need to pre-allocate, but it seems to make the linter happy + received := make([]string, 0, 10) scanner := bufio.NewScanner(conn) for scanner.Scan() { received = append(received, scanner.Text()) diff --git a/x-pack/heartbeat/include/fields.go b/x-pack/heartbeat/include/fields.go index f284bbb9ea51..5952fb2168d1 100644 --- a/x-pack/heartbeat/include/fields.go +++ b/x-pack/heartbeat/include/fields.go @@ -19,5 +19,5 @@ func init() { // AssetFieldsYml returns asset data. // This is the base64 encoded zlib format compressed contents of fields.yml. func AssetFieldsYml() string { - return "" + return "" } diff --git a/x-pack/heartbeat/monitors/browser/browser.go b/x-pack/heartbeat/monitors/browser/browser.go index bd2695736436..d0dc11a328fe 100644 --- a/x-pack/heartbeat/monitors/browser/browser.go +++ b/x-pack/heartbeat/monitors/browser/browser.go @@ -1,7 +1,7 @@ // Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. -//go:build linux || darwin +//go:build linux || darwin || synthetics package browser diff --git a/x-pack/heartbeat/monitors/browser/config.go b/x-pack/heartbeat/monitors/browser/config.go index baecdda959da..b03aa6c8b437 100644 --- a/x-pack/heartbeat/monitors/browser/config.go +++ b/x-pack/heartbeat/monitors/browser/config.go @@ -2,7 +2,7 @@ // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. -//go:build linux || darwin +//go:build linux || darwin || synthetics package browser diff --git a/x-pack/heartbeat/monitors/browser/config_test.go b/x-pack/heartbeat/monitors/browser/config_test.go index 7d11fc47b45d..8685e8d9797c 100644 --- a/x-pack/heartbeat/monitors/browser/config_test.go +++ b/x-pack/heartbeat/monitors/browser/config_test.go @@ -1,7 +1,7 @@ // Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. -//go:build linux +//go:build linux || synthetics package browser diff --git a/x-pack/heartbeat/monitors/browser/source/inline.go b/x-pack/heartbeat/monitors/browser/source/inline.go index 7c99ac01d84d..cc3ac4f78b54 100644 --- a/x-pack/heartbeat/monitors/browser/source/inline.go +++ b/x-pack/heartbeat/monitors/browser/source/inline.go @@ -1,7 +1,7 @@ // Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. -//go:build linux || darwin +//go:build linux || darwin || synthetics package source diff --git a/x-pack/heartbeat/monitors/browser/source/inline_test.go b/x-pack/heartbeat/monitors/browser/source/inline_test.go index 1035a59f3bd8..cb5725ad75c9 100644 --- a/x-pack/heartbeat/monitors/browser/source/inline_test.go +++ b/x-pack/heartbeat/monitors/browser/source/inline_test.go @@ -1,7 +1,7 @@ // Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. -//go:build linux || darwin +//go:build linux || darwin || synthetics package source diff --git a/x-pack/heartbeat/monitors/browser/source/local.go b/x-pack/heartbeat/monitors/browser/source/local.go index e87567cb3a75..7455655d4990 100644 --- a/x-pack/heartbeat/monitors/browser/source/local.go +++ b/x-pack/heartbeat/monitors/browser/source/local.go @@ -2,7 +2,7 @@ // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. -//go:build linux || darwin +//go:build linux || darwin || synthetics package source diff --git a/x-pack/heartbeat/monitors/browser/source/local_test.go b/x-pack/heartbeat/monitors/browser/source/local_test.go index 271107cd4180..c3a7f20a1c3b 100644 --- a/x-pack/heartbeat/monitors/browser/source/local_test.go +++ b/x-pack/heartbeat/monitors/browser/source/local_test.go @@ -2,7 +2,7 @@ // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. -//go:build linux +//go:build linux || synthetics package source diff --git a/x-pack/heartbeat/monitors/browser/source/offline.go b/x-pack/heartbeat/monitors/browser/source/offline.go index c124208a56b7..4c3dd215f7d1 100644 --- a/x-pack/heartbeat/monitors/browser/source/offline.go +++ b/x-pack/heartbeat/monitors/browser/source/offline.go @@ -1,7 +1,7 @@ // Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. -//go:build linux || darwin +//go:build linux || darwin || synthetics package source diff --git a/x-pack/heartbeat/monitors/browser/source/project.go b/x-pack/heartbeat/monitors/browser/source/project.go index 7caf3edcc2ef..af6bd96dfdc6 100644 --- a/x-pack/heartbeat/monitors/browser/source/project.go +++ b/x-pack/heartbeat/monitors/browser/source/project.go @@ -1,7 +1,7 @@ // Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. -//go:build linux || darwin +//go:build linux || darwin || synthetics package source diff --git a/x-pack/heartbeat/monitors/browser/source/project_test.go b/x-pack/heartbeat/monitors/browser/source/project_test.go index 09dda4b5146c..2304a20f6a4b 100644 --- a/x-pack/heartbeat/monitors/browser/source/project_test.go +++ b/x-pack/heartbeat/monitors/browser/source/project_test.go @@ -1,7 +1,7 @@ // Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. -//go:build linux +//go:build linux || synthetics package source diff --git a/x-pack/heartbeat/monitors/browser/source/source.go b/x-pack/heartbeat/monitors/browser/source/source.go index dbc442d7785f..21be17b5621f 100644 --- a/x-pack/heartbeat/monitors/browser/source/source.go +++ b/x-pack/heartbeat/monitors/browser/source/source.go @@ -2,7 +2,7 @@ // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. -//go:build linux || darwin +//go:build linux || darwin || synthetics package source diff --git a/x-pack/heartbeat/monitors/browser/source/source_test.go b/x-pack/heartbeat/monitors/browser/source/source_test.go index adb09e57345c..6aa6716152a6 100644 --- a/x-pack/heartbeat/monitors/browser/source/source_test.go +++ b/x-pack/heartbeat/monitors/browser/source/source_test.go @@ -2,7 +2,7 @@ // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. -//go:build linux +//go:build linux || synthetics package source diff --git a/x-pack/heartbeat/monitors/browser/source/unzip.go b/x-pack/heartbeat/monitors/browser/source/unzip.go index c9b48c5464da..d8a5f617302f 100644 --- a/x-pack/heartbeat/monitors/browser/source/unzip.go +++ b/x-pack/heartbeat/monitors/browser/source/unzip.go @@ -2,8 +2,7 @@ // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. -//go:build linux || darwin -// +build linux darwin +//go:build linux || darwin || synthetics package source diff --git a/x-pack/heartbeat/monitors/browser/source/zipurl.go b/x-pack/heartbeat/monitors/browser/source/zipurl.go index d8c035000b2e..748b5a8acbf1 100644 --- a/x-pack/heartbeat/monitors/browser/source/zipurl.go +++ b/x-pack/heartbeat/monitors/browser/source/zipurl.go @@ -2,7 +2,7 @@ // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. -//go:build linux || darwin +//go:build linux || darwin || synthetics package source diff --git a/x-pack/heartbeat/monitors/browser/source/zipurl_test.go b/x-pack/heartbeat/monitors/browser/source/zipurl_test.go index ca0d47b21b74..468e7282c83b 100644 --- a/x-pack/heartbeat/monitors/browser/source/zipurl_test.go +++ b/x-pack/heartbeat/monitors/browser/source/zipurl_test.go @@ -2,7 +2,7 @@ // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. -//go:build linux +//go:build linux || synthetics package source diff --git a/x-pack/heartbeat/monitors/browser/sourcejob.go b/x-pack/heartbeat/monitors/browser/sourcejob.go index d8ca78b23e3b..c62c50b3bb17 100644 --- a/x-pack/heartbeat/monitors/browser/sourcejob.go +++ b/x-pack/heartbeat/monitors/browser/sourcejob.go @@ -1,7 +1,7 @@ // Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. -//go:build linux || darwin +//go:build linux || darwin || synthetics package browser diff --git a/x-pack/heartbeat/monitors/browser/sourcejob_test.go b/x-pack/heartbeat/monitors/browser/sourcejob_test.go index 69cd4f7ffa4b..0e6127d354a7 100644 --- a/x-pack/heartbeat/monitors/browser/sourcejob_test.go +++ b/x-pack/heartbeat/monitors/browser/sourcejob_test.go @@ -1,7 +1,7 @@ // Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. -//go:build linux +//go:build linux || synthetics package browser diff --git a/x-pack/heartbeat/monitors/browser/synthexec/enrich.go b/x-pack/heartbeat/monitors/browser/synthexec/enrich.go index de78fd4bb00c..627f97aebb8a 100644 --- a/x-pack/heartbeat/monitors/browser/synthexec/enrich.go +++ b/x-pack/heartbeat/monitors/browser/synthexec/enrich.go @@ -1,7 +1,7 @@ // Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. -//go:build linux || darwin +//go:build linux || darwin || synthetics package synthexec @@ -24,10 +24,9 @@ import ( type enricher func(event *beat.Event, se *SynthEvent) error type streamEnricher struct { - je *journeyEnricher - journeyCount int - sFields stdfields.StdMonitorFields - checkGroup string + je *journeyEnricher + sFields stdfields.StdMonitorFields + checkGroup string } func newStreamEnricher(sFields stdfields.StdMonitorFields) *streamEnricher { @@ -39,15 +38,6 @@ func (senr *streamEnricher) enrich(event *beat.Event, se *SynthEvent) error { senr.je = newJourneyEnricher(senr) } - // TODO: Remove this when zip monitors are removed and we have 1:1 monitor / journey - if se != nil && se.Type == JourneyStart { - senr.journeyCount++ - if senr.journeyCount > 1 { - senr.checkGroup = makeUuid() - } - } - - eventext.MergeEventFields(event, map[string]interface{}{"monitor": map[string]interface{}{"check_group": senr.checkGroup}}) return senr.je.enrich(event, se) } diff --git a/x-pack/heartbeat/monitors/browser/synthexec/enrich_test.go b/x-pack/heartbeat/monitors/browser/synthexec/enrich_test.go index c02a953af1b0..2f660b09642c 100644 --- a/x-pack/heartbeat/monitors/browser/synthexec/enrich_test.go +++ b/x-pack/heartbeat/monitors/browser/synthexec/enrich_test.go @@ -1,7 +1,7 @@ // Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. -//go:build linux +//go:build linux || synthetics package synthexec @@ -20,7 +20,6 @@ import ( "github.com/elastic/beats/v7/libbeat/processors/add_data_stream" "github.com/elastic/elastic-agent-libs/mapstr" "github.com/elastic/go-lookslike" - "github.com/elastic/go-lookslike/isdef" "github.com/elastic/go-lookslike/testslike" "github.com/elastic/go-lookslike/validator" ) @@ -103,7 +102,6 @@ func TestJourneyEnricher(t *testing.T) { "synthetics.type": "heartbeat/summary", "url": wrappers.URLFields(u), "monitor.duration.us": int64(journeyEnd.Timestamp().Sub(journeyStart.Timestamp()) / time.Microsecond), - "monitor.check_group": isdef.IsString, })) } return lookslike.Compose(v...) @@ -125,29 +123,9 @@ func TestJourneyEnricher(t *testing.T) { }) } - tests := []struct { - name string - IsLegacyBrowserSource bool - }{ - { - name: "legacy project monitor", - IsLegacyBrowserSource: true, - }, - { - name: "modern monitor", - IsLegacyBrowserSource: false, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - sFields.IsLegacyBrowserSource = tt.IsLegacyBrowserSource - - je := makeTestJourneyEnricher(sFields) - for _, se := range synthEvents { - check(t, se, je) - } - }) + je := makeTestJourneyEnricher(sFields) + for _, se := range synthEvents { + check(t, se, je) } } diff --git a/x-pack/heartbeat/monitors/browser/synthexec/execmultiplexer.go b/x-pack/heartbeat/monitors/browser/synthexec/execmultiplexer.go index f1fd358cec36..f3684398a513 100644 --- a/x-pack/heartbeat/monitors/browser/synthexec/execmultiplexer.go +++ b/x-pack/heartbeat/monitors/browser/synthexec/execmultiplexer.go @@ -1,7 +1,7 @@ // Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. -//go:build linux || darwin +//go:build linux || darwin || synthetics package synthexec diff --git a/x-pack/heartbeat/monitors/browser/synthexec/execmultiplexer_test.go b/x-pack/heartbeat/monitors/browser/synthexec/execmultiplexer_test.go index c3ffe5807808..33f01b9c6b4d 100644 --- a/x-pack/heartbeat/monitors/browser/synthexec/execmultiplexer_test.go +++ b/x-pack/heartbeat/monitors/browser/synthexec/execmultiplexer_test.go @@ -1,7 +1,7 @@ // Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. -//go:build linux +//go:build linux || synthetics package synthexec @@ -16,7 +16,7 @@ func TestExecMultiplexer(t *testing.T) { em := NewExecMultiplexer() // Generate three fake journeys with three fake steps - var testEvents []*SynthEvent + testEvents := make([]*SynthEvent, 0, 3) time := float64(0) for jIdx := 0; jIdx < 3; jIdx++ { time++ // fake time to make events seem spaced out diff --git a/x-pack/heartbeat/monitors/browser/synthexec/synthexec.go b/x-pack/heartbeat/monitors/browser/synthexec/synthexec.go index f422f8b71bfb..fbfb71526cc5 100644 --- a/x-pack/heartbeat/monitors/browser/synthexec/synthexec.go +++ b/x-pack/heartbeat/monitors/browser/synthexec/synthexec.go @@ -1,7 +1,7 @@ // Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. -//go:build linux || darwin +//go:build linux || darwin || synthetics package synthexec diff --git a/x-pack/heartbeat/monitors/browser/synthexec/synthexec_linux.go b/x-pack/heartbeat/monitors/browser/synthexec/synthexec_linux.go index f2c3ae8aaf6f..c90f6083a0dd 100644 --- a/x-pack/heartbeat/monitors/browser/synthexec/synthexec_linux.go +++ b/x-pack/heartbeat/monitors/browser/synthexec/synthexec_linux.go @@ -1,7 +1,7 @@ // Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. -//go:build linux +//go:build linux || synthetics package synthexec diff --git a/x-pack/heartbeat/monitors/browser/synthexec/synthexec_test.go b/x-pack/heartbeat/monitors/browser/synthexec/synthexec_test.go index 7d54581f9266..762b12358a72 100644 --- a/x-pack/heartbeat/monitors/browser/synthexec/synthexec_test.go +++ b/x-pack/heartbeat/monitors/browser/synthexec/synthexec_test.go @@ -2,7 +2,7 @@ // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. -//go:build linux +//go:build linux || synthetics package synthexec diff --git a/x-pack/heartbeat/monitors/browser/synthexec/synthtypes.go b/x-pack/heartbeat/monitors/browser/synthexec/synthtypes.go index e091914b0afe..974a53174350 100644 --- a/x-pack/heartbeat/monitors/browser/synthexec/synthtypes.go +++ b/x-pack/heartbeat/monitors/browser/synthexec/synthtypes.go @@ -1,7 +1,7 @@ // Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. -//go:build linux || darwin +//go:build linux || darwin || synthetics package synthexec diff --git a/x-pack/heartbeat/monitors/browser/synthexec/synthtypes_test.go b/x-pack/heartbeat/monitors/browser/synthexec/synthtypes_test.go index eac2957d8782..b26868b5b692 100644 --- a/x-pack/heartbeat/monitors/browser/synthexec/synthtypes_test.go +++ b/x-pack/heartbeat/monitors/browser/synthexec/synthtypes_test.go @@ -1,7 +1,7 @@ // Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. -//go:build linux || darwin +//go:build linux || darwin || synthetics package synthexec diff --git a/x-pack/heartbeat/monitors/browser/synthexec/testcmd/main.go b/x-pack/heartbeat/monitors/browser/synthexec/testcmd/main.go index 01f4bbb04a4b..b4d241b19fee 100644 --- a/x-pack/heartbeat/monitors/browser/synthexec/testcmd/main.go +++ b/x-pack/heartbeat/monitors/browser/synthexec/testcmd/main.go @@ -2,7 +2,7 @@ // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. -//go:build linux || darwin +//go:build linux || darwin || synthetics package main diff --git a/x-pack/heartbeat/scenarios/basics_test.go b/x-pack/heartbeat/scenarios/basics_test.go index 06ebb39f15ba..da19b2264f74 100644 --- a/x-pack/heartbeat/scenarios/basics_test.go +++ b/x-pack/heartbeat/scenarios/basics_test.go @@ -17,14 +17,22 @@ import ( _ "github.com/elastic/beats/v7/heartbeat/monitors/active/http" _ "github.com/elastic/beats/v7/heartbeat/monitors/active/icmp" _ "github.com/elastic/beats/v7/heartbeat/monitors/active/tcp" + "github.com/elastic/beats/v7/heartbeat/monitors/wrappers/summarizer" + "github.com/elastic/beats/v7/heartbeat/monitors/wrappers/summarizer/summarizertesthelper" "github.com/elastic/beats/v7/x-pack/heartbeat/scenarios/framework" ) +type CheckHistItem struct { + cg string + summary *summarizer.JobSummary +} + func TestSimpleScenariosBasicFields(t *testing.T) { - scenarioDB.RunAll(t, func(t *testing.T, mtr *framework.MonitorTestRun, err error) { + t.Parallel() + runner := func(t *testing.T, mtr *framework.MonitorTestRun, err error) { require.GreaterOrEqual(t, len(mtr.Events()), 1) - lastCg := "" - for i, e := range mtr.Events() { + var checkHist []*CheckHistItem + for _, e := range mtr.Events() { testslike.Test(t, lookslike.MustCompile(map[string]interface{}{ "monitor": map[string]interface{}{ "id": mtr.StdFields.ID, @@ -34,20 +42,45 @@ func TestSimpleScenariosBasicFields(t *testing.T) { }, }), e.Fields) - // Ensure that all check groups are equal and don't change - cg, err := e.GetValue("monitor.check_group") + // Ensure that all check groups are equal and don't except across retries + cgIface, err := e.GetValue("monitor.check_group") require.NoError(t, err) - cgStr := cg.(string) - if i == 0 { - lastCg = cgStr - } else { - require.Equal(t, lastCg, cgStr) + cg := cgIface.(string) + + var summary *summarizer.JobSummary + summaryIface, err := e.GetValue("summary") + if err == nil { + summary = summaryIface.(*summarizer.JobSummary) + } + + var lastCheck *CheckHistItem + if len(checkHist) > 0 { + lastCheck = checkHist[len(checkHist)-1] + } + + curCheck := &CheckHistItem{cg: cg, summary: summary} + + checkHist = append(checkHist, curCheck) + + // If we have a prior check + if lastCheck != nil { + // If the last event was a summary, meaning this one is a retry + if lastCheck.summary != nil { + // then we expect a new check group + require.NotEqual(t, lastCheck.cg, curCheck.cg) + } else { + // If we're within the same check due to multiple continuations + // we expect equality + require.Equal(t, lastCheck.cg, curCheck.cg) + } } } - }) + } + scenarioDB.RunAllWithSeparateTwists(t, []*framework.Twist{TwistMaxAttempts(2)}, runner) } func TestLightweightUrls(t *testing.T) { + t.Parallel() scenarioDB.RunTag(t, "lightweight", func(t *testing.T, mtr *framework.MonitorTestRun, err error) { for _, e := range mtr.Events() { testslike.Test(t, lookslike.MustCompile(map[string]interface{}{ @@ -62,15 +95,13 @@ func TestLightweightUrls(t *testing.T) { } func TestLightweightSummaries(t *testing.T) { + t.Parallel() scenarioDB.RunTag(t, "lightweight", func(t *testing.T, mtr *framework.MonitorTestRun, err error) { all := mtr.Events() lastEvent, firstEvents := all[len(all)-1], all[:len(all)-1] - testslike.Test(t, lookslike.MustCompile(map[string]interface{}{ - "summary": map[string]interface{}{ - "up": hbtestllext.IsUint16, - "down": hbtestllext.IsUint16, - }, - }), lastEvent.Fields) + testslike.Test(t, + summarizertesthelper.SummaryValidator(1, 0), + lastEvent.Fields) for _, e := range firstEvents { summary, _ := e.GetValue("summary") @@ -80,17 +111,25 @@ func TestLightweightSummaries(t *testing.T) { } func TestRunFromOverride(t *testing.T) { + t.Parallel() scenarioDB.RunAllWithATwist(t, TwistAddRunFrom, func(t *testing.T, mtr *framework.MonitorTestRun, err error) { - for _, e := range mtr.Events() { - testslike.Test(t, lookslike.MustCompile(map[string]interface{}{ - "state": hbtestllext.IsMonitorStateInLocation(TestLocationDefault.ID), + for idx, e := range mtr.Events() { + stateIsDef := isdef.KeyMissing + isLast := idx+1 == len(mtr.Events()) + if isLast { + stateIsDef = hbtestllext.IsMonitorStateInLocation(TestLocationDefault.ID) + } + validator := lookslike.MustCompile(map[string]interface{}{ + "state": stateIsDef, "observer": map[string]interface{}{ "name": TestLocationDefault.ID, "geo": map[string]interface{}{ "name": TestLocationDefault.Geo.Name, }, }, - }), e.Fields) + }) + + testslike.Test(t, validator, e.Fields) } }) } diff --git a/x-pack/heartbeat/scenarios/browserscenarios.go b/x-pack/heartbeat/scenarios/browserscenarios.go index b36e9b07b49d..0cfce6831f43 100644 --- a/x-pack/heartbeat/scenarios/browserscenarios.go +++ b/x-pack/heartbeat/scenarios/browserscenarios.go @@ -2,7 +2,7 @@ // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. -//go:build linux || darwin +//go:build linux || darwin || synthetics package scenarios diff --git a/x-pack/heartbeat/scenarios/framework/fakeloader.go b/x-pack/heartbeat/scenarios/framework/fakeloader.go index 8864989a92e1..d5e1ee0a26ab 100644 --- a/x-pack/heartbeat/scenarios/framework/fakeloader.go +++ b/x-pack/heartbeat/scenarios/framework/fakeloader.go @@ -5,8 +5,8 @@ package framework import ( + "fmt" "sync" - "time" "github.com/elastic/beats/v7/heartbeat/monitors/stdfields" "github.com/elastic/beats/v7/heartbeat/monitors/wrappers/monitorstate" @@ -16,40 +16,43 @@ import ( // without actually using ES type loaderDB struct { keysToState map[string]*monitorstate.State - mtx *sync.Mutex - lastTime time.Time + mtx sync.Mutex } func newLoaderDB() *loaderDB { return &loaderDB{ keysToState: map[string]*monitorstate.State{}, - mtx: &sync.Mutex{}, + mtx: sync.Mutex{}, } } -func (ldb loaderDB) AddState(sf stdfields.StdMonitorFields, state *monitorstate.State) { +func (ldb *loaderDB) AddState(sf stdfields.StdMonitorFields, state *monitorstate.State) { ldb.mtx.Lock() defer ldb.mtx.Unlock() - ldb.lastTime = time.Now() - - ldb.keysToState[monitorstate.LoaderDBKey(sf, ldb.lastTime, 0)] = state + key := keyFor(sf) + ldb.keysToState[key] = state } -func (ldb loaderDB) GetState(sf stdfields.StdMonitorFields) *monitorstate.State { +func (ldb *loaderDB) GetState(sf stdfields.StdMonitorFields) *monitorstate.State { ldb.mtx.Lock() defer ldb.mtx.Unlock() - found := ldb.keysToState[monitorstate.LoaderDBKey(sf, ldb.lastTime, 0)] + key := keyFor(sf) + found := ldb.keysToState[key] return found } -func (ldb loaderDB) StateLoader() monitorstate.StateLoader { - return func(sf stdfields.StdMonitorFields) (*monitorstate.State, error) { - ldb.mtx.Lock() - defer ldb.mtx.Unlock() +func keyFor(sf stdfields.StdMonitorFields) string { + rfid := "default" + if sf.RunFrom != nil { + rfid = sf.RunFrom.ID + } + return fmt.Sprintf("%s-%s", rfid, sf.ID) +} - found := ldb.keysToState[monitorstate.LoaderDBKey(sf, ldb.lastTime, 0)] - return found, nil +func (ldb *loaderDB) StateLoader() monitorstate.StateLoader { + return func(sf stdfields.StdMonitorFields) (*monitorstate.State, error) { + return ldb.GetState(sf), nil } } diff --git a/x-pack/heartbeat/scenarios/framework/framework.go b/x-pack/heartbeat/scenarios/framework/framework.go index 1d43f5d3e78c..2a092bb73ef4 100644 --- a/x-pack/heartbeat/scenarios/framework/framework.go +++ b/x-pack/heartbeat/scenarios/framework/framework.go @@ -40,24 +40,32 @@ type Scenario struct { NumberOfRuns int } -type Twist func(Scenario) Scenario +type Twist struct { + Name string + Fn func(Scenario) Scenario +} -func MakeTwist(name string, fn Twist) Twist { - return func(s Scenario) Scenario { - newS := s.clone() - newS.Name = fmt.Sprintf("%s~<%s>", s.Name, name) - return fn(newS) +func MakeTwist(name string, fn func(Scenario) Scenario) *Twist { + return &Twist{ + Name: name, + Fn: func(s Scenario) Scenario { + newS := s.clone() + newS.Name = fmt.Sprintf("%s~<%s>", s.Name, name) + return fn(newS) + }, } } -func MultiTwist(twists ...Twist) Twist { - return func(s Scenario) Scenario { - res := s - for _, twist := range twists { - res = twist(res) - } - return res - } +func MultiTwist(twists ...*Twist) *Twist { + return MakeTwist( + "<~MULTI-TWIST~[", + func(s Scenario) Scenario { + res := s + for _, twist := range twists { + res = twist.Fn(res) + } + return res + }) } func (s Scenario) clone() Scenario { @@ -69,10 +77,10 @@ func (s Scenario) clone() Scenario { return copy } -func (s Scenario) Run(t *testing.T, twist Twist, callback func(t *testing.T, mtr *MonitorTestRun, err error)) { +func (s Scenario) Run(t *testing.T, twist *Twist, callback func(t *testing.T, mtr *MonitorTestRun, err error)) { runS := s if twist != nil { - runS = twist(s.clone()) + runS = twist.Fn(s.clone()) } cfgMap, rClose, err := runS.Runner(t) @@ -106,12 +114,13 @@ func (s Scenario) Run(t *testing.T, twist Twist, callback func(t *testing.T, mtr mtr.wait() events = append(events, mtr.Events()...) + sf = mtr.StdFields + conf = mtr.Config + if lse := LastState(events).State; lse != nil { loaderDB.AddState(mtr.StdFields, lse) } - sf = mtr.StdFields - conf = mtr.Config mtr.close() } @@ -143,14 +152,10 @@ func NewScenarioDB() *ScenarioDB { } func (sdb *ScenarioDB) Init() { - var prunedList []Scenario - browserCapable := os.Getenv("ELASTIC_SYNTHETICS_CAPABLE") == "true" - icmpCapable := os.Getenv("ELASTIC_ICMP_CAPABLE") == "true" sdb.initOnce.Do(func() { + var prunedList []Scenario + icmpCapable := os.Getenv("ELASTIC_ICMP_CAPABLE") == "true" for _, s := range sdb.All { - if s.Type == "browser" && !browserCapable { - continue - } if s.Type == "icmp" && !icmpCapable { continue } @@ -160,8 +165,8 @@ func (sdb *ScenarioDB) Init() { sdb.ByTag[t] = append(sdb.ByTag[t], s) } } + sdb.All = prunedList }) - sdb.All = prunedList } func (sdb *ScenarioDB) Add(s ...Scenario) { @@ -172,7 +177,16 @@ func (sdb *ScenarioDB) RunAll(t *testing.T, callback func(*testing.T, *MonitorTe sdb.RunAllWithATwist(t, nil, callback) } -func (sdb *ScenarioDB) RunAllWithATwist(t *testing.T, twist Twist, callback func(*testing.T, *MonitorTestRun, error)) { +// RunAllWithSeparateTwists runs a list of twists separately, but not chained together. +// This is helpful for building up a test matrix by composing twists. +func (sdb *ScenarioDB) RunAllWithSeparateTwists(t *testing.T, twists []*Twist, callback func(*testing.T, *MonitorTestRun, error)) { + twists = append(twists, nil) // we also run once with no twists + for _, twist := range twists { + sdb.RunAllWithATwist(t, twist, callback) + } +} + +func (sdb *ScenarioDB) RunAllWithATwist(t *testing.T, twist *Twist, callback func(*testing.T, *MonitorTestRun, error)) { sdb.Init() for _, s := range sdb.All { s.Run(t, twist, callback) @@ -183,7 +197,7 @@ func (sdb *ScenarioDB) RunTag(t *testing.T, tagName string, callback func(*testi sdb.RunTagWithATwist(t, tagName, nil, callback) } -func (sdb *ScenarioDB) RunTagWithATwist(t *testing.T, tagName string, twist Twist, callback func(*testing.T, *MonitorTestRun, error)) { +func (sdb *ScenarioDB) RunTagWithATwist(t *testing.T, tagName string, twist *Twist, callback func(*testing.T, *MonitorTestRun, error)) { sdb.Init() if len(sdb.ByTag[tagName]) < 1 { require.Failf(t, "no scenarios have tags matching %s", tagName) @@ -204,8 +218,10 @@ type MonitorTestRun struct { func runMonitorOnce(t *testing.T, monitorConfig mapstr.M, location *hbconfig.LocationWithID, stateLoader monitorstate.StateLoader) (mtr *MonitorTestRun, err error) { mtr = &MonitorTestRun{ - Config: monitorConfig, - StdFields: stdfields.StdMonitorFields{}, + Config: monitorConfig, + StdFields: stdfields.StdMonitorFields{ + RunFrom: location, + }, } // make a pipeline diff --git a/x-pack/heartbeat/scenarios/scenarios.go b/x-pack/heartbeat/scenarios/scenarios.go index e67f5054805c..fe0e1bbee164 100644 --- a/x-pack/heartbeat/scenarios/scenarios.go +++ b/x-pack/heartbeat/scenarios/scenarios.go @@ -5,55 +5,19 @@ package scenarios import ( - "context" "fmt" - "net/http" "net/http/httptest" "net/url" - "sync" "testing" - "time" - - "github.com/stretchr/testify/require" "github.com/elastic/elastic-agent-libs/mapstr" - "github.com/elastic/beats/v7/heartbeat/hbtest" "github.com/elastic/beats/v7/x-pack/heartbeat/scenarios/framework" ) var scenarioDB = framework.NewScenarioDB() var testWs *httptest.Server -var testWsOnce = &sync.Once{} - -// Starting this thing up is expensive, let's just do it once -func startTestWebserver(t *testing.T) *httptest.Server { - testWsOnce.Do(func() { - testWs = httptest.NewServer(hbtest.HelloWorldHandler(200)) - var err error - for i := 0; i < 20; i++ { - var resp *http.Response - req, _ := http.NewRequestWithContext(context.Background(), http.MethodGet, testWs.URL, nil) - resp, err = http.DefaultClient.Do(req) - if err == nil { - resp.Body.Close() - if resp.StatusCode == 200 { - break - } - } - - time.Sleep(time.Millisecond * 250) - } - - if err != nil { - require.NoError(t, err, "could not retrieve successful response from test webserver") - } - }) - - return testWs -} - // Note, no browser scenarios here, those all go in browserscenarios.go // since they have different build tags func init() { diff --git a/x-pack/heartbeat/scenarios/stateloader_test.go b/x-pack/heartbeat/scenarios/stateloader_test.go index 79cd2abf0b4b..e3ea54a06910 100644 --- a/x-pack/heartbeat/scenarios/stateloader_test.go +++ b/x-pack/heartbeat/scenarios/stateloader_test.go @@ -7,23 +7,39 @@ package scenarios import ( "testing" - "github.com/stretchr/testify/require" + "github.com/stretchr/testify/assert" "github.com/elastic/beats/v7/heartbeat/monitors/wrappers/monitorstate" + "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/x-pack/heartbeat/scenarios/framework" ) -var esIntegTwists = framework.MultiTwist(TwistAddRunFrom, TwistMultiRun(3)) +const numRuns = 2 + +var esIntegTwists = framework.MultiTwist(TwistAddRunFrom, TwistMultiRun(numRuns)) func TestStateContinuity(t *testing.T) { + t.Parallel() scenarioDB.RunAllWithATwist(t, esIntegTwists, func(t *testing.T, mtr *framework.MonitorTestRun, err error) { + events := mtr.Events() + var errors = []*beat.Event{} + var sout string + for _, e := range events { + if message, ok := e.GetValue("synthetics.payload.message"); ok == nil { + sout = sout + "\n" + message.(string) + } + if _, ok := e.GetValue("error"); ok == nil { + errors = append(errors, e) + } + } + lastSS := framework.LastState(mtr.Events()) - require.Equal(t, monitorstate.StatusUp, lastSS.State.Status) + assert.Equal(t, monitorstate.StatusUp, lastSS.State.Status, "monitor was unexpectedly down, synthetics console output: %s, errors", sout, errors) allSS := framework.AllStates(mtr.Events()) - require.Len(t, allSS, 3) + assert.Len(t, allSS, numRuns) - require.Equal(t, 3, lastSS.State.Checks) + assert.Equal(t, numRuns, lastSS.State.Checks) }) } diff --git a/x-pack/heartbeat/scenarios/testws.go b/x-pack/heartbeat/scenarios/testws.go new file mode 100644 index 000000000000..badfdb272364 --- /dev/null +++ b/x-pack/heartbeat/scenarios/testws.go @@ -0,0 +1,68 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package scenarios + +import ( + "context" + "fmt" + "net/http" + "net/http/httptest" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/elastic/beats/v7/heartbeat/hbtest" +) + +var testWsOnce = &sync.Once{} + +// Starting this thing up is expensive, let's just do it once +func startTestWebserver(t *testing.T) *httptest.Server { + testWsOnce.Do(func() { + testWs = httptest.NewServer(hbtest.HelloWorldHandler(200)) + + waitForWs(t, testWs.URL) + }) + + return testWs +} + +func StartStatefulTestWS(t *testing.T, statuses []int) *httptest.Server { + mtx := sync.Mutex{} + statusIdx := 0 + testWs = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + mtx.Lock() + defer mtx.Unlock() + + statusIdx++ + if statusIdx > len(statuses)-1 { + statusIdx = 0 + } + + status := statuses[statusIdx] + w.WriteHeader(status) + _, _ = w.Write([]byte(fmt.Sprintf("Status: %d", status))) + })) + + // wait for ws to become available + waitForWs(t, testWs.URL) + + return testWs +} + +func waitForWs(t *testing.T, url string) { + require.Eventuallyf( + t, + func() bool { + req, _ := http.NewRequestWithContext(context.Background(), http.MethodGet, url, nil) + resp, _ := http.DefaultClient.Do(req) + resp.Body.Close() + return resp.StatusCode == 200 + }, + 10*time.Second, 250*time.Millisecond, "could not start webserver", + ) +} diff --git a/x-pack/heartbeat/scenarios/twists.go b/x-pack/heartbeat/scenarios/twists.go index f612aa2c61f2..3109b5e73d9b 100644 --- a/x-pack/heartbeat/scenarios/twists.go +++ b/x-pack/heartbeat/scenarios/twists.go @@ -6,10 +6,12 @@ package scenarios import ( "fmt" + "testing" "github.com/elastic/beats/v7/heartbeat/config" "github.com/elastic/beats/v7/libbeat/processors/util" "github.com/elastic/beats/v7/x-pack/heartbeat/scenarios/framework" + "github.com/elastic/elastic-agent-libs/mapstr" ) var TestLocationDefault = TestLocationMpls @@ -27,9 +29,22 @@ var TwistAddRunFrom = framework.MakeTwist("add run_from", func(s framework.Scena return s }) -func TwistMultiRun(times int) framework.Twist { +func TwistMultiRun(times int) *framework.Twist { return framework.MakeTwist(fmt.Sprintf("run %d times", times), func(s framework.Scenario) framework.Scenario { s.NumberOfRuns = times return s }) } + +func TwistMaxAttempts(maxAttempts int) *framework.Twist { + return framework.MakeTwist(fmt.Sprintf("run with %d max_attempts", maxAttempts), func(s framework.Scenario) framework.Scenario { + s.Tags = append(s.Tags, "retry") + origRunner := s.Runner + s.Runner = func(t *testing.T) (config mapstr.M, close func(), err error) { + config, close, err = origRunner(t) + config["max_attempts"] = maxAttempts + return config, close, err + } + return s + }) +}