Skip to content

Commit

Permalink
[Heartbeat] Defer monitor / ICMP errors to monitor runtime / ES (#29413)
Browse files Browse the repository at this point in the history
This PR generally improves the error behavior of all monitors, and some specific ICMP related errors as well. These two items are combined in one PR because the general theme here is improving the ICMP error experience, and improving ICMP required improving all monitors.

Fixes #29346
and incremental progress toward #29692

General monitor improvements
Generally speaking, per #29692 we are trying to send monitor output to ES wherever possible. With this PR we now send any monitor initialization errors (such as a lack of ICMP kernel capabilities) during monitor creation to ES. We do this by allowing the monitor to initialize and run on schedule, even though we know it will always send the same error message. This lets users more easily debug issues in Kibana.

ICMP Specific Improvement
This PR also Removes broken a IP capability check that caused heartbeat to be unable to start. We now just rely on return codes from attempts to actually send packets. This is the more specific fix for #29346 . I was not able to exactly reproduce the exact customer reported issue, where the user somehow disabled ipv6 in a way that the ICMP loop that I can't exactly reproduce. I tried disabling ipv6 fully with sudo sysctl net.ipv6.conf.all.disable_ipv6=1 but that didn't yield the error in #29346

The logic is now simplified, there's no truly reliable way to know if you can send an ipv6 (or ipv4) ping before you send it (settings can change at any time! network cards can disappear!), so we just let the error codes happen as the check is executed. This is also generally a better UX in that the errors will now be visible in the Uptime app, not just the logs.

It should be noted that the ipv4 and ipv6 boolean options only are documented to affect how DNS lookups happen. With this change the behavior matches the docs.

Note that ICMP is a bit weird in that there's a single ICMP loop in heartbeat, and all monitors are really just interacting with that.

Removal of .synthetics
This also ignores the .synthetics folder which has been inconvenient for some time for devs, in that it dirties the git path

(cherry picked from commit 616db13)
  • Loading branch information
andrewvc authored and mergify-bot committed Jan 18, 2022
1 parent 9510d24 commit 60aed56
Show file tree
Hide file tree
Showing 12 changed files with 119 additions and 68 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
*Heartbeat*

- Fix race condition in http monitors using `mode:all` that can cause crashes. {pull}29697[pull]
- Fix broken ICMP availability check that prevented heartbeat from starting in rare cases. {pull}29413[pull]

*Journalbeat*

Expand Down Expand Up @@ -160,6 +161,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d

*Heartbeat*

- More errors are now visible in ES with new logic failing monitors later to ease debugging. {pull}29413[pull]


*Metricbeat*

Expand Down
4 changes: 0 additions & 4 deletions heartbeat/monitors/active/icmp/icmp.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,6 @@ func (jf *jobFactory) checkConfig() error {
}

func (jf *jobFactory) makePlugin() (plugin2 plugin.Plugin, err error) {
if err := jf.loop.checkNetworkMode(jf.ipVersion); err != nil {
return plugin.Plugin{}, err
}

pingFactory := jf.pingIPFactory(&jf.config)

var j []jobs.Job
Expand Down
1 change: 0 additions & 1 deletion heartbeat/monitors/active/icmp/loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
)

type ICMPLoop interface {
checkNetworkMode(mode string) error
ping(
addr *net.IPAddr,
timeout time.Duration,
Expand Down
34 changes: 9 additions & 25 deletions heartbeat/monitors/active/icmp/stdloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package icmp
import (
"bytes"
"encoding/binary"
"errors"
"fmt"
"math/rand"
"net"
Expand Down Expand Up @@ -159,29 +158,6 @@ func newICMPLoop() (*stdICMPLoop, error) {
return l, nil
}

func (l *stdICMPLoop) checkNetworkMode(mode string) error {
ip4, ip6 := false, false
switch mode {
case "ip4":
ip4 = true
case "ip6":
ip6 = true
case "ip":
ip4, ip6 = true, true
default:
return fmt.Errorf("'%v' is not supported", mode)
}

if ip4 && l.conn4 == nil {
return errors.New("failed to initiate IPv4 support. Check log details for permission configuration")
}
if ip6 && l.conn6 == nil {
return errors.New("failed to initiate IPv6 support. Check log details for permission configuration")
}

return nil
}

func (l *stdICMPLoop) runICMPRecv(conn *icmp.PacketConn, proto int) {
for {
bytes := make([]byte, 512)
Expand Down Expand Up @@ -251,6 +227,14 @@ func (l *stdICMPLoop) ping(
timeout time.Duration,
interval time.Duration,
) (time.Duration, int, error) {
isIPv6 := addr.IP.To4() == nil
if isIPv6 && l.conn6 == nil {
return -1, -1, fmt.Errorf("cannot ping IPv6 address '%s', no IPv6 connection available", addr)
}
if !isIPv6 && l.conn4 == nil {
return -1, -1, fmt.Errorf("cannot ping IPv4 address '%s', no IPv4 connection available", addr)
}

var err error
toTimer := time.NewTimer(timeout)
defer toTimer.Stop()
Expand Down Expand Up @@ -379,7 +363,7 @@ func (l *stdICMPLoop) sendEchoRequest(addr *net.IPAddr) (*requestContext, error)

_, err := conn.WriteTo(encoded, addr)
if err != nil {
return nil, err
return nil, fmt.Errorf("could not write to conn: %w", err)
}

ctx.ts = ts
Expand Down
7 changes: 4 additions & 3 deletions heartbeat/monitors/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func TestPreProcessors(t *testing.T) {
}

func TestDuplicateMonitorIDs(t *testing.T) {
serverMonConf := mockPluginConf(t, "custom", "@every 1ms", "http://example.net")
serverMonConf := mockPluginConf(t, "custom", "custom", "@every 1ms", "http://example.net")
badConf := mockBadPluginConf(t, "custom", "@every 1ms")
reg, built, closed := mockPluginsReg()
pipelineConnector := &MockPipelineConnector{}
Expand Down Expand Up @@ -190,8 +190,9 @@ func TestDuplicateMonitorIDs(t *testing.T) {
m1.Stop()
m2.Stop()

// 3 are counted as built, even the bad config
require.Equal(t, 3, built.Load())
// Two are counted as built. The bad config is missing a stdfield so it
// doesn't complete construction
require.Equal(t, 2, built.Load())
// Only 2 closes, because the bad config isn't closed
require.Equal(t, 2, closed.Load())
}
61 changes: 41 additions & 20 deletions heartbeat/monitors/mocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,24 +99,28 @@ func (pc *MockPipelineConnector) ConnectWith(beat.ClientConfig) (beat.Client, er
return c, nil
}

func mockEventMonitorValidator(id string) validator.Validator {
func baseMockEventMonitorValidator(id string, name string, status string) validator.Validator {
var idMatcher isdef.IsDef
if id == "" {
idMatcher = isdef.IsStringMatching(regexp.MustCompile(`^auto-test-.*`))
} else {
idMatcher = isdef.IsEqual(id)
}
return lookslike.MustCompile(map[string]interface{}{
"monitor": map[string]interface{}{
"id": idMatcher,
"name": name,
"type": "test",
"duration.us": isdef.IsDuration,
"status": status,
"check_group": isdef.IsString,
},
})
}

func mockEventMonitorValidator(id string, name string) validator.Validator {
return lookslike.Strict(lookslike.Compose(
lookslike.MustCompile(map[string]interface{}{
"monitor": map[string]interface{}{
"id": idMatcher,
"name": "",
"type": "test",
"duration.us": isdef.IsDuration,
"status": "up",
"check_group": isdef.IsString,
},
}),
baseMockEventMonitorValidator(id, name, "up"),
hbtestllext.MonitorTimespanValidator,
hbtest.SummaryChecks(1, 0),
lookslike.MustCompile(mockEventCustomFields()),
Expand Down Expand Up @@ -151,15 +155,19 @@ func mockPluginBuilder() (plugin.PluginFactory, *atomic.Int, *atomic.Int) {
unpacked := struct {
URLs []string `config:"urls" validate:"required"`
}{}
err := config.Unpack(&unpacked)
if err != nil {
return plugin.Plugin{}, err
}
j, err := createMockJob()

// track all closes, even on error
closer := func() error {
closed.Inc()
return nil
}

err := config.Unpack(&unpacked)
if err != nil {
return plugin.Plugin{DoClose: closer}, err
}
j, err := createMockJob()

return plugin.Plugin{Jobs: j, DoClose: closer, Endpoints: 1}, err
},
Stats: plugin.NewPluginCountersRecorder("test", reg)},
Expand All @@ -174,13 +182,15 @@ func mockPluginsReg() (p *plugin.PluginsReg, built *atomic.Int, closed *atomic.I
return reg, built, closed
}

func mockPluginConf(t *testing.T, id string, schedule string, url string) *common.Config {
func mockPluginConf(t *testing.T, id string, name string, schedule string, url string) *common.Config {
confMap := map[string]interface{}{
"type": "test",
"urls": []string{url},
"schedule": schedule,
"name": name,
}

// Optional to let us simulate this key missing
if id != "" {
confMap["id"] = id
}
Expand All @@ -197,7 +207,6 @@ func mockBadPluginConf(t *testing.T, id string, schedule string) *common.Config
confMap := map[string]interface{}{
"type": "test",
"notanoption": []string{"foo"},
"schedule": schedule,
}

if id != "" {
Expand All @@ -210,8 +219,6 @@ func mockBadPluginConf(t *testing.T, id string, schedule string) *common.Config
return conf
}

// mockInvalidPlugin conf returns a config that invalid at the basic level of
// what's expected in heartbeat, i.e. no type.
func mockInvalidPluginConf(t *testing.T) *common.Config {
confMap := map[string]interface{}{
"hoeutnheou": "oueanthoue",
Expand All @@ -222,3 +229,17 @@ func mockInvalidPluginConf(t *testing.T) *common.Config {

return conf
}

func mockInvalidPluginConfWithStdFields(t *testing.T, id string, name string, schedule string) *common.Config {
confMap := map[string]interface{}{
"type": "test",
"id": id,
"name": name,
"schedule": schedule,
}

conf, err := common.NewConfigFrom(confMap)
require.NoError(t, err)

return conf
}
21 changes: 17 additions & 4 deletions heartbeat/monitors/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,13 +163,26 @@ func newMonitorUnsafe(
return p.Close()
}

wrappedJobs := wrappers.WrapCommon(p.Jobs, m.stdFields)
m.endpoints = p.Endpoints

// If we've hit an error at this point, still run on schedule, but always return an error.
// This way the error is clearly communicated through to kibana.
// Since the error is not recoverable in these instances, the user will need to reconfigure
// the monitor, which will destroy and recreate it in heartbeat, thus clearing this error.
//
// Note: we do this at this point, and no earlier, because at a minimum we need the
// standard monitor fields (id, name and schedule) to deliver an error to kibana in a way
// that it can render.
if err != nil {
return m, fmt.Errorf("job err %v", err)
// Note, needed to hoist err to this scope, not just to add a prefix
fullErr := fmt.Errorf("job could not be initialized: %s", err)
// A placeholder job that always returns an error
p.Jobs = []jobs.Job{func(event *beat.Event) ([]jobs.Job, error) {
return nil, fullErr
}}
}

wrappedJobs := wrappers.WrapCommon(p.Jobs, m.stdFields)
m.endpoints = p.Endpoints

m.configuredJobs, err = m.makeTasks(config, wrappedJobs)
if err != nil {
return m, err
Expand Down
38 changes: 34 additions & 4 deletions heartbeat/monitors/monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,49 @@ import (
"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/heartbeat/scheduler"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/monitoring"
"github.com/elastic/go-lookslike"
"github.com/elastic/go-lookslike/isdef"
"github.com/elastic/go-lookslike/testslike"
"github.com/elastic/go-lookslike/validator"
)

func TestMonitor(t *testing.T) {
serverMonConf := mockPluginConf(t, "", "@every 1ms", "http://example.net")
// TestMonitorBasic tests a basic config
func TestMonitorBasic(t *testing.T) {
testMonitorConfig(
t,
mockPluginConf(t, "myId", "myName", "@every 1ms", "http://example.net"),
mockEventMonitorValidator("myId", "myName"),
)
}

// TestMonitorBasic tests a config that errors out at plugin creation, but still has stdfields defined.
// This should cause the monitor to run, but only produce error documents
func TestMonitorCfgError(t *testing.T) {
testMonitorConfig(
t,
mockInvalidPluginConfWithStdFields(t, "invalidTestId", "invalidTestName", "@every 10s"),
lookslike.Compose(
baseMockEventMonitorValidator("invalidTestId", "invalidTestName", "down"),
lookslike.MustCompile(common.MapStr{
"error": common.MapStr{
"message": isdef.IsStringContaining("missing required field"),
"type": "io",
},
}),
),
)
}

func testMonitorConfig(t *testing.T, conf *common.Config, eventValidator validator.Validator) {
reg, built, closed := mockPluginsReg()
pipelineConnector := &MockPipelineConnector{}

sched := scheduler.Create(1, monitoring.NewRegistry(), time.Local, nil, false)
defer sched.Stop()

mon, err := newMonitor(serverMonConf, reg, pipelineConnector, sched.Add, nil, false)
mon, err := newMonitor(conf, reg, pipelineConnector, sched.Add, nil, false)
require.NoError(t, err)

mon.Start()
Expand All @@ -56,7 +86,7 @@ func TestMonitor(t *testing.T) {
pcClient.Close()

for _, event := range pcClient.Publishes() {
testslike.Test(t, mockEventMonitorValidator(""), event.Fields)
testslike.Test(t, eventValidator, event.Fields)
}
} else {
// Let's yield this goroutine so we don't spin
Expand Down
5 changes: 2 additions & 3 deletions heartbeat/monitors/stdfields/stdfields.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,9 @@
package stdfields

import (
"fmt"
"time"

"github.com/pkg/errors"

"github.com/elastic/beats/v7/heartbeat/scheduler/schedule"
"github.com/elastic/beats/v7/libbeat/common"
)
Expand All @@ -46,7 +45,7 @@ func ConfigToStdMonitorFields(config *common.Config) (StdMonitorFields, error) {
mpi := StdMonitorFields{Enabled: true}

if err := config.Unpack(&mpi); err != nil {
return mpi, errors.Wrap(err, "error unpacking monitor plugin config")
return mpi, fmt.Errorf("error unpacking monitor plugin config: %w", err)
}

// Use `service_name` if `service.name` is unspecified
Expand Down
2 changes: 1 addition & 1 deletion heartbeat/monitors/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func runPublishJob(job jobs.Job, client *WrappedClient) []scheduler.TaskFunc {

conts, err := job(event)
if err != nil {
logp.Err("Job %v failed with: ", err)
logp.Err("Job failed with: %s", err)
}

hasContinuations := len(conts) > 0
Expand Down
10 changes: 7 additions & 3 deletions heartbeat/tests/system/test_icmp.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import sys
import time
import unittest
import re
from beat.beat import INTEGRATION_TESTS
from elasticsearch import Elasticsearch
from heartbeat import BaseTest
Expand Down Expand Up @@ -44,8 +45,11 @@ def has_failed_message(): return self.log_contains("Failed to initialize ICMP lo
# we should run pings on those machines and make sure they work.
self.wait_until(lambda: has_started_message() or has_failed_message(), 30)

self.wait_until(lambda: self.output_has(lines=1))
output = self.read_output()
monitor_status = output[0]["monitor.status"]
if has_failed_message():
proc.check_kill_and_wait(1)
assert monitor_status == "down"
self.assertRegex(output[0]["error.message"], ".*Insufficient privileges to perform ICMP ping.*")
else:
# Check that documents are moving through
self.wait_until(lambda: self.output_has(lines=1))
assert monitor_status == "up"
1 change: 1 addition & 0 deletions x-pack/heartbeat/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
.synthetics

0 comments on commit 60aed56

Please sign in to comment.