diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 5f35daf8fd75..e66ae58f2c15 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -19,6 +19,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d *Heartbeat* +- Fix broken macOS ICMP python e2e test. {pull}29900[29900] - Only add monitor.status to browser events when summary. {pull}29460[29460] - Also add summary to journeys for which the synthetics runner crashes. {pull}29606[29606] @@ -51,6 +52,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d *Heartbeat* - Fix race condition in http monitors using `mode:all` that can cause crashes. {pull}29697[pull] +- Fix broken ICMP availability check that prevented heartbeat from starting in rare cases. {pull}29413[pull] *Metricbeat* @@ -83,6 +85,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d *Heartbeat* +- More errors are now visible in ES with new logic failing monitors later to ease debugging. {pull}29413[pull] + *Metricbeat* diff --git a/heartbeat/monitors/active/icmp/icmp.go b/heartbeat/monitors/active/icmp/icmp.go index ef57cdbebae3..073660c259af 100644 --- a/heartbeat/monitors/active/icmp/icmp.go +++ b/heartbeat/monitors/active/icmp/icmp.go @@ -91,10 +91,6 @@ func (jf *jobFactory) checkConfig() error { } func (jf *jobFactory) makePlugin() (plugin2 plugin.Plugin, err error) { - if err := jf.loop.checkNetworkMode(jf.ipVersion); err != nil { - return plugin.Plugin{}, err - } - pingFactory := jf.pingIPFactory(&jf.config) var j []jobs.Job diff --git a/heartbeat/monitors/active/icmp/loop.go b/heartbeat/monitors/active/icmp/loop.go index de4d0ef4dfc2..b29fa247f16d 100644 --- a/heartbeat/monitors/active/icmp/loop.go +++ b/heartbeat/monitors/active/icmp/loop.go @@ -23,7 +23,6 @@ import ( ) type ICMPLoop interface { - checkNetworkMode(mode string) error ping( addr *net.IPAddr, timeout time.Duration, diff --git a/heartbeat/monitors/active/icmp/stdloop.go b/heartbeat/monitors/active/icmp/stdloop.go index 05858f5537fc..9f5f55439678 100644 --- a/heartbeat/monitors/active/icmp/stdloop.go +++ b/heartbeat/monitors/active/icmp/stdloop.go @@ -20,7 +20,6 @@ package icmp import ( "bytes" "encoding/binary" - "errors" "fmt" "math/rand" "net" @@ -159,29 +158,6 @@ func newICMPLoop() (*stdICMPLoop, error) { return l, nil } -func (l *stdICMPLoop) checkNetworkMode(mode string) error { - ip4, ip6 := false, false - switch mode { - case "ip4": - ip4 = true - case "ip6": - ip6 = true - case "ip": - ip4, ip6 = true, true - default: - return fmt.Errorf("'%v' is not supported", mode) - } - - if ip4 && l.conn4 == nil { - return errors.New("failed to initiate IPv4 support. Check log details for permission configuration") - } - if ip6 && l.conn6 == nil { - return errors.New("failed to initiate IPv6 support. Check log details for permission configuration") - } - - return nil -} - func (l *stdICMPLoop) runICMPRecv(conn *icmp.PacketConn, proto int) { for { bytes := make([]byte, 512) @@ -251,6 +227,14 @@ func (l *stdICMPLoop) ping( timeout time.Duration, interval time.Duration, ) (time.Duration, int, error) { + isIPv6 := addr.IP.To4() == nil + if isIPv6 && l.conn6 == nil { + return -1, -1, fmt.Errorf("cannot ping IPv6 address '%s', no IPv6 connection available", addr) + } + if !isIPv6 && l.conn4 == nil { + return -1, -1, fmt.Errorf("cannot ping IPv4 address '%s', no IPv4 connection available", addr) + } + var err error toTimer := time.NewTimer(timeout) defer toTimer.Stop() @@ -379,7 +363,7 @@ func (l *stdICMPLoop) sendEchoRequest(addr *net.IPAddr) (*requestContext, error) _, err := conn.WriteTo(encoded, addr) if err != nil { - return nil, err + return nil, fmt.Errorf("could not write to conn: %w", err) } ctx.ts = ts diff --git a/heartbeat/monitors/factory_test.go b/heartbeat/monitors/factory_test.go index c395050aaa1e..e4ff3589ccd8 100644 --- a/heartbeat/monitors/factory_test.go +++ b/heartbeat/monitors/factory_test.go @@ -149,7 +149,7 @@ func TestPreProcessors(t *testing.T) { } func TestDuplicateMonitorIDs(t *testing.T) { - serverMonConf := mockPluginConf(t, "custom", "@every 1ms", "http://example.net") + serverMonConf := mockPluginConf(t, "custom", "custom", "@every 1ms", "http://example.net") badConf := mockBadPluginConf(t, "custom", "@every 1ms") reg, built, closed := mockPluginsReg() pipelineConnector := &MockPipelineConnector{} @@ -190,8 +190,9 @@ func TestDuplicateMonitorIDs(t *testing.T) { m1.Stop() m2.Stop() - // 3 are counted as built, even the bad config - require.Equal(t, 3, built.Load()) + // Two are counted as built. The bad config is missing a stdfield so it + // doesn't complete construction + require.Equal(t, 2, built.Load()) // Only 2 closes, because the bad config isn't closed require.Equal(t, 2, closed.Load()) } diff --git a/heartbeat/monitors/mocks_test.go b/heartbeat/monitors/mocks_test.go index 6d51791e3d72..720088db211f 100644 --- a/heartbeat/monitors/mocks_test.go +++ b/heartbeat/monitors/mocks_test.go @@ -99,24 +99,28 @@ func (pc *MockPipelineConnector) ConnectWith(beat.ClientConfig) (beat.Client, er return c, nil } -func mockEventMonitorValidator(id string) validator.Validator { +func baseMockEventMonitorValidator(id string, name string, status string) validator.Validator { var idMatcher isdef.IsDef if id == "" { idMatcher = isdef.IsStringMatching(regexp.MustCompile(`^auto-test-.*`)) } else { idMatcher = isdef.IsEqual(id) } + return lookslike.MustCompile(map[string]interface{}{ + "monitor": map[string]interface{}{ + "id": idMatcher, + "name": name, + "type": "test", + "duration.us": isdef.IsDuration, + "status": status, + "check_group": isdef.IsString, + }, + }) +} + +func mockEventMonitorValidator(id string, name string) validator.Validator { return lookslike.Strict(lookslike.Compose( - lookslike.MustCompile(map[string]interface{}{ - "monitor": map[string]interface{}{ - "id": idMatcher, - "name": "", - "type": "test", - "duration.us": isdef.IsDuration, - "status": "up", - "check_group": isdef.IsString, - }, - }), + baseMockEventMonitorValidator(id, name, "up"), hbtestllext.MonitorTimespanValidator, hbtest.SummaryChecks(1, 0), lookslike.MustCompile(mockEventCustomFields()), @@ -151,15 +155,19 @@ func mockPluginBuilder() (plugin.PluginFactory, *atomic.Int, *atomic.Int) { unpacked := struct { URLs []string `config:"urls" validate:"required"` }{} - err := config.Unpack(&unpacked) - if err != nil { - return plugin.Plugin{}, err - } - j, err := createMockJob() + + // track all closes, even on error closer := func() error { closed.Inc() return nil } + + err := config.Unpack(&unpacked) + if err != nil { + return plugin.Plugin{DoClose: closer}, err + } + j, err := createMockJob() + return plugin.Plugin{Jobs: j, DoClose: closer, Endpoints: 1}, err }, Stats: plugin.NewPluginCountersRecorder("test", reg)}, @@ -174,13 +182,15 @@ func mockPluginsReg() (p *plugin.PluginsReg, built *atomic.Int, closed *atomic.I return reg, built, closed } -func mockPluginConf(t *testing.T, id string, schedule string, url string) *common.Config { +func mockPluginConf(t *testing.T, id string, name string, schedule string, url string) *common.Config { confMap := map[string]interface{}{ "type": "test", "urls": []string{url}, "schedule": schedule, + "name": name, } + // Optional to let us simulate this key missing if id != "" { confMap["id"] = id } @@ -197,7 +207,6 @@ func mockBadPluginConf(t *testing.T, id string, schedule string) *common.Config confMap := map[string]interface{}{ "type": "test", "notanoption": []string{"foo"}, - "schedule": schedule, } if id != "" { @@ -210,8 +219,6 @@ func mockBadPluginConf(t *testing.T, id string, schedule string) *common.Config return conf } -// mockInvalidPlugin conf returns a config that invalid at the basic level of -// what's expected in heartbeat, i.e. no type. func mockInvalidPluginConf(t *testing.T) *common.Config { confMap := map[string]interface{}{ "hoeutnheou": "oueanthoue", @@ -222,3 +229,17 @@ func mockInvalidPluginConf(t *testing.T) *common.Config { return conf } + +func mockInvalidPluginConfWithStdFields(t *testing.T, id string, name string, schedule string) *common.Config { + confMap := map[string]interface{}{ + "type": "test", + "id": id, + "name": name, + "schedule": schedule, + } + + conf, err := common.NewConfigFrom(confMap) + require.NoError(t, err) + + return conf +} diff --git a/heartbeat/monitors/monitor.go b/heartbeat/monitors/monitor.go index 669579e31aa0..91a6a881d848 100644 --- a/heartbeat/monitors/monitor.go +++ b/heartbeat/monitors/monitor.go @@ -163,13 +163,26 @@ func newMonitorUnsafe( return p.Close() } - wrappedJobs := wrappers.WrapCommon(p.Jobs, m.stdFields) - m.endpoints = p.Endpoints - + // If we've hit an error at this point, still run on schedule, but always return an error. + // This way the error is clearly communicated through to kibana. + // Since the error is not recoverable in these instances, the user will need to reconfigure + // the monitor, which will destroy and recreate it in heartbeat, thus clearing this error. + // + // Note: we do this at this point, and no earlier, because at a minimum we need the + // standard monitor fields (id, name and schedule) to deliver an error to kibana in a way + // that it can render. if err != nil { - return m, fmt.Errorf("job err %v", err) + // Note, needed to hoist err to this scope, not just to add a prefix + fullErr := fmt.Errorf("job could not be initialized: %s", err) + // A placeholder job that always returns an error + p.Jobs = []jobs.Job{func(event *beat.Event) ([]jobs.Job, error) { + return nil, fullErr + }} } + wrappedJobs := wrappers.WrapCommon(p.Jobs, m.stdFields) + m.endpoints = p.Endpoints + m.configuredJobs, err = m.makeTasks(config, wrappedJobs) if err != nil { return m, err diff --git a/heartbeat/monitors/monitor_test.go b/heartbeat/monitors/monitor_test.go index bbcd5b9b74c5..8184a867eaea 100644 --- a/heartbeat/monitors/monitor_test.go +++ b/heartbeat/monitors/monitor_test.go @@ -25,19 +25,49 @@ import ( "github.com/stretchr/testify/require" "github.com/elastic/beats/v7/heartbeat/scheduler" + "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/monitoring" + "github.com/elastic/go-lookslike" + "github.com/elastic/go-lookslike/isdef" "github.com/elastic/go-lookslike/testslike" + "github.com/elastic/go-lookslike/validator" ) -func TestMonitor(t *testing.T) { - serverMonConf := mockPluginConf(t, "", "@every 1ms", "http://example.net") +// TestMonitorBasic tests a basic config +func TestMonitorBasic(t *testing.T) { + testMonitorConfig( + t, + mockPluginConf(t, "myId", "myName", "@every 1ms", "http://example.net"), + mockEventMonitorValidator("myId", "myName"), + ) +} + +// TestMonitorBasic tests a config that errors out at plugin creation, but still has stdfields defined. +// This should cause the monitor to run, but only produce error documents +func TestMonitorCfgError(t *testing.T) { + testMonitorConfig( + t, + mockInvalidPluginConfWithStdFields(t, "invalidTestId", "invalidTestName", "@every 10s"), + lookslike.Compose( + baseMockEventMonitorValidator("invalidTestId", "invalidTestName", "down"), + lookslike.MustCompile(common.MapStr{ + "error": common.MapStr{ + "message": isdef.IsStringContaining("missing required field"), + "type": "io", + }, + }), + ), + ) +} + +func testMonitorConfig(t *testing.T, conf *common.Config, eventValidator validator.Validator) { reg, built, closed := mockPluginsReg() pipelineConnector := &MockPipelineConnector{} sched := scheduler.Create(1, monitoring.NewRegistry(), time.Local, nil, false) defer sched.Stop() - mon, err := newMonitor(serverMonConf, reg, pipelineConnector, sched.Add, nil, false) + mon, err := newMonitor(conf, reg, pipelineConnector, sched.Add, nil, false) require.NoError(t, err) mon.Start() @@ -56,7 +86,7 @@ func TestMonitor(t *testing.T) { pcClient.Close() for _, event := range pcClient.Publishes() { - testslike.Test(t, mockEventMonitorValidator(""), event.Fields) + testslike.Test(t, eventValidator, event.Fields) } } else { // Let's yield this goroutine so we don't spin diff --git a/heartbeat/monitors/stdfields/stdfields.go b/heartbeat/monitors/stdfields/stdfields.go index f09161c2adf0..92e5bc4bb90f 100644 --- a/heartbeat/monitors/stdfields/stdfields.go +++ b/heartbeat/monitors/stdfields/stdfields.go @@ -18,10 +18,9 @@ package stdfields import ( + "fmt" "time" - "github.com/pkg/errors" - "github.com/elastic/beats/v7/heartbeat/scheduler/schedule" "github.com/elastic/beats/v7/libbeat/common" ) @@ -46,7 +45,7 @@ func ConfigToStdMonitorFields(config *common.Config) (StdMonitorFields, error) { mpi := StdMonitorFields{Enabled: true} if err := config.Unpack(&mpi); err != nil { - return mpi, errors.Wrap(err, "error unpacking monitor plugin config") + return mpi, fmt.Errorf("error unpacking monitor plugin config: %w", err) } // Use `service_name` if `service.name` is unspecified diff --git a/heartbeat/monitors/task.go b/heartbeat/monitors/task.go index 11b013a98717..a7f1848b3aef 100644 --- a/heartbeat/monitors/task.go +++ b/heartbeat/monitors/task.go @@ -108,7 +108,7 @@ func runPublishJob(job jobs.Job, client *WrappedClient) []scheduler.TaskFunc { conts, err := job(event) if err != nil { - logp.Err("Job %v failed with: ", err) + logp.Err("Job failed with: %s", err) } hasContinuations := len(conts) > 0 diff --git a/heartbeat/tests/system/test_icmp.py b/heartbeat/tests/system/test_icmp.py index 7f61a7430f88..f7be72e89f42 100644 --- a/heartbeat/tests/system/test_icmp.py +++ b/heartbeat/tests/system/test_icmp.py @@ -6,6 +6,7 @@ import sys import time import unittest +import re from beat.beat import INTEGRATION_TESTS from elasticsearch import Elasticsearch from heartbeat import BaseTest @@ -35,17 +36,10 @@ def test_base(self): proc = self.start_beat() - def has_started_message(): return self.log_contains("ICMP loop successfully initialized") - - def has_failed_message(): return self.log_contains("Failed to initialize ICMP loop") - - # We don't know if the system tests are running is configured to support or not support ping, but we can at least check that the ICMP loop - # was initiated. In the future we should start up VMs with the correct perms configured and be more specific. In addition to that - # we should run pings on those machines and make sure they work. - self.wait_until(lambda: has_started_message() or has_failed_message(), 30) - - if has_failed_message(): - proc.check_kill_and_wait(1) - else: - # Check that documents are moving through - self.wait_until(lambda: self.output_has(lines=1)) + # because we have no way of knowing if the current environment has the ability to do ICMP pings + # we are instead asserting the monitor's status via the output and checking for errors where appropriate + self.wait_until(lambda: self.output_has(lines=1)) + output = self.read_output() + monitor_status = output[0]["monitor.status"] + assert monitor_status == "up" or monitor_status == "down" + assert output[0]["monitor.type"] == "icmp" diff --git a/x-pack/heartbeat/.gitignore b/x-pack/heartbeat/.gitignore new file mode 100644 index 000000000000..8af6c73dc612 --- /dev/null +++ b/x-pack/heartbeat/.gitignore @@ -0,0 +1 @@ +.synthetics