Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Heartbeat] Defer monitor / ICMP errors to monitor runtime / ES #29413

Merged
merged 12 commits into from
Jan 18, 2022
3 changes: 3 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
*Heartbeat*

- Fix race condition in http monitors using `mode:all` that can cause crashes. {pull}29697[pull]
- Fix broken ICMP availability check that prevented heartbeat from starting in rare cases. {pull}29413[pull]

*Metricbeat*

Expand Down Expand Up @@ -167,6 +168,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d

*Heartbeat*

- More errors are now visible in ES with new logic failing monitors later to ease debugging. {pull}29413[pull]


*Metricbeat*

Expand Down
4 changes: 0 additions & 4 deletions heartbeat/monitors/active/icmp/icmp.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,6 @@ func (jf *jobFactory) checkConfig() error {
}

func (jf *jobFactory) makePlugin() (plugin2 plugin.Plugin, err error) {
if err := jf.loop.checkNetworkMode(jf.ipVersion); err != nil {
Copy link
Contributor Author

@andrewvc andrewvc Jan 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now checked in stdloop.go, we don't want to check at plugin initialization, just at exec time

return plugin.Plugin{}, err
}

pingFactory := jf.pingIPFactory(&jf.config)

var j []jobs.Job
Expand Down
1 change: 0 additions & 1 deletion heartbeat/monitors/active/icmp/loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
)

type ICMPLoop interface {
checkNetworkMode(mode string) error
ping(
addr *net.IPAddr,
timeout time.Duration,
Expand Down
34 changes: 9 additions & 25 deletions heartbeat/monitors/active/icmp/stdloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package icmp
import (
"bytes"
"encoding/binary"
"errors"
"fmt"
"math/rand"
"net"
Expand Down Expand Up @@ -159,29 +158,6 @@ func newICMPLoop() (*stdICMPLoop, error) {
return l, nil
}

func (l *stdICMPLoop) checkNetworkMode(mode string) error {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved to main loop icmp recv so we don't interfere with heartbeat initialization.

ip4, ip6 := false, false
switch mode {
case "ip4":
ip4 = true
case "ip6":
ip6 = true
case "ip":
ip4, ip6 = true, true
default:
return fmt.Errorf("'%v' is not supported", mode)
}

if ip4 && l.conn4 == nil {
return errors.New("failed to initiate IPv4 support. Check log details for permission configuration")
}
if ip6 && l.conn6 == nil {
return errors.New("failed to initiate IPv6 support. Check log details for permission configuration")
}

return nil
}

func (l *stdICMPLoop) runICMPRecv(conn *icmp.PacketConn, proto int) {
for {
bytes := make([]byte, 512)
Expand Down Expand Up @@ -251,6 +227,14 @@ func (l *stdICMPLoop) ping(
timeout time.Duration,
interval time.Duration,
) (time.Duration, int, error) {
isIPv6 := addr.IP.To4() == nil
if isIPv6 && l.conn6 == nil {
return -1, -1, fmt.Errorf("cannot ping IPv6 address '%s', no IPv6 connection available", addr)
}
if !isIPv6 && l.conn4 == nil {
return -1, -1, fmt.Errorf("cannot ping IPv4 address '%s', no IPv4 connection available", addr)
}

var err error
toTimer := time.NewTimer(timeout)
defer toTimer.Stop()
Expand Down Expand Up @@ -379,7 +363,7 @@ func (l *stdICMPLoop) sendEchoRequest(addr *net.IPAddr) (*requestContext, error)

_, err := conn.WriteTo(encoded, addr)
if err != nil {
return nil, err
return nil, fmt.Errorf("could not write to conn: %w", err)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

more specific error context is always nice

}

ctx.ts = ts
Expand Down
7 changes: 4 additions & 3 deletions heartbeat/monitors/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func TestPreProcessors(t *testing.T) {
}

func TestDuplicateMonitorIDs(t *testing.T) {
serverMonConf := mockPluginConf(t, "custom", "@every 1ms", "http://example.net")
serverMonConf := mockPluginConf(t, "custom", "custom", "@every 1ms", "http://example.net")
badConf := mockBadPluginConf(t, "custom", "@every 1ms")
reg, built, closed := mockPluginsReg()
pipelineConnector := &MockPipelineConnector{}
Expand Down Expand Up @@ -190,8 +190,9 @@ func TestDuplicateMonitorIDs(t *testing.T) {
m1.Stop()
m2.Stop()

// 3 are counted as built, even the bad config
require.Equal(t, 3, built.Load())
// Two are counted as built. The bad config is missing a stdfield so it
// doesn't complete construction
require.Equal(t, 2, built.Load())
// Only 2 closes, because the bad config isn't closed
require.Equal(t, 2, closed.Load())
}
61 changes: 41 additions & 20 deletions heartbeat/monitors/mocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,24 +99,28 @@ func (pc *MockPipelineConnector) ConnectWith(beat.ClientConfig) (beat.Client, er
return c, nil
}

func mockEventMonitorValidator(id string) validator.Validator {
func baseMockEventMonitorValidator(id string, name string, status string) validator.Validator {
var idMatcher isdef.IsDef
if id == "" {
idMatcher = isdef.IsStringMatching(regexp.MustCompile(`^auto-test-.*`))
} else {
idMatcher = isdef.IsEqual(id)
}
return lookslike.MustCompile(map[string]interface{}{
"monitor": map[string]interface{}{
"id": idMatcher,
"name": name,
"type": "test",
"duration.us": isdef.IsDuration,
"status": status,
"check_group": isdef.IsString,
},
})
}

func mockEventMonitorValidator(id string, name string) validator.Validator {
return lookslike.Strict(lookslike.Compose(
lookslike.MustCompile(map[string]interface{}{
"monitor": map[string]interface{}{
"id": idMatcher,
"name": "",
"type": "test",
"duration.us": isdef.IsDuration,
"status": "up",
"check_group": isdef.IsString,
},
}),
baseMockEventMonitorValidator(id, name, "up"),
hbtestllext.MonitorTimespanValidator,
hbtest.SummaryChecks(1, 0),
lookslike.MustCompile(mockEventCustomFields()),
Expand Down Expand Up @@ -151,15 +155,19 @@ func mockPluginBuilder() (plugin.PluginFactory, *atomic.Int, *atomic.Int) {
unpacked := struct {
URLs []string `config:"urls" validate:"required"`
}{}
err := config.Unpack(&unpacked)
if err != nil {
return plugin.Plugin{}, err
}
j, err := createMockJob()

// track all closes, even on error
closer := func() error {
closed.Inc()
return nil
}

err := config.Unpack(&unpacked)
if err != nil {
return plugin.Plugin{DoClose: closer}, err
}
j, err := createMockJob()

return plugin.Plugin{Jobs: j, DoClose: closer, Endpoints: 1}, err
},
Stats: plugin.NewPluginCountersRecorder("test", reg)},
Expand All @@ -174,13 +182,15 @@ func mockPluginsReg() (p *plugin.PluginsReg, built *atomic.Int, closed *atomic.I
return reg, built, closed
}

func mockPluginConf(t *testing.T, id string, schedule string, url string) *common.Config {
func mockPluginConf(t *testing.T, id string, name string, schedule string, url string) *common.Config {
confMap := map[string]interface{}{
"type": "test",
"urls": []string{url},
"schedule": schedule,
"name": name,
}

// Optional to let us simulate this key missing
if id != "" {
confMap["id"] = id
}
Expand All @@ -197,7 +207,6 @@ func mockBadPluginConf(t *testing.T, id string, schedule string) *common.Config
confMap := map[string]interface{}{
"type": "test",
"notanoption": []string{"foo"},
"schedule": schedule,
}

if id != "" {
Expand All @@ -210,8 +219,6 @@ func mockBadPluginConf(t *testing.T, id string, schedule string) *common.Config
return conf
}

// mockInvalidPlugin conf returns a config that invalid at the basic level of
// what's expected in heartbeat, i.e. no type.
func mockInvalidPluginConf(t *testing.T) *common.Config {
confMap := map[string]interface{}{
"hoeutnheou": "oueanthoue",
Expand All @@ -222,3 +229,17 @@ func mockInvalidPluginConf(t *testing.T) *common.Config {

return conf
}

func mockInvalidPluginConfWithStdFields(t *testing.T, id string, name string, schedule string) *common.Config {
confMap := map[string]interface{}{
"type": "test",
"id": id,
"name": name,
"schedule": schedule,
}

conf, err := common.NewConfigFrom(confMap)
require.NoError(t, err)

return conf
}
21 changes: 17 additions & 4 deletions heartbeat/monitors/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,13 +163,26 @@ func newMonitorUnsafe(
return p.Close()
}

wrappedJobs := wrappers.WrapCommon(p.Jobs, m.stdFields)
m.endpoints = p.Endpoints

// If we've hit an error at this point, still run on schedule, but always return an error.
// This way the error is clearly communicated through to kibana.
// Since the error is not recoverable in these instances, the user will need to reconfigure
// the monitor, which will destroy and recreate it in heartbeat, thus clearing this error.
//
// Note: we do this at this point, and no earlier, because at a minimum we need the
// standard monitor fields (id, name and schedule) to deliver an error to kibana in a way
// that it can render.
if err != nil {
return m, fmt.Errorf("job err %v", err)
// Note, needed to hoist err to this scope, not just to add a prefix
fullErr := fmt.Errorf("job could not be initialized: %s", err)
// A placeholder job that always returns an error
p.Jobs = []jobs.Job{func(event *beat.Event) ([]jobs.Job, error) {
return nil, fullErr
}}
Comment on lines +175 to +180
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is great! Thanks for the helpful comment on the top too. This monitor initialisation part is really elegant, but having the comment on the top does help a lot given the further wrapping and how the jobs then get queued.

}

wrappedJobs := wrappers.WrapCommon(p.Jobs, m.stdFields)
m.endpoints = p.Endpoints

m.configuredJobs, err = m.makeTasks(config, wrappedJobs)
if err != nil {
return m, err
Expand Down
38 changes: 34 additions & 4 deletions heartbeat/monitors/monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,49 @@ import (
"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/heartbeat/scheduler"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/monitoring"
"github.com/elastic/go-lookslike"
"github.com/elastic/go-lookslike/isdef"
"github.com/elastic/go-lookslike/testslike"
"github.com/elastic/go-lookslike/validator"
)

func TestMonitor(t *testing.T) {
serverMonConf := mockPluginConf(t, "", "@every 1ms", "http://example.net")
// TestMonitorBasic tests a basic config
func TestMonitorBasic(t *testing.T) {
testMonitorConfig(
t,
mockPluginConf(t, "myId", "myName", "@every 1ms", "http://example.net"),
mockEventMonitorValidator("myId", "myName"),
)
}

// TestMonitorBasic tests a config that errors out at plugin creation, but still has stdfields defined.
// This should cause the monitor to run, but only produce error documents
func TestMonitorCfgError(t *testing.T) {
testMonitorConfig(
t,
mockInvalidPluginConfWithStdFields(t, "invalidTestId", "invalidTestName", "@every 10s"),
lookslike.Compose(
baseMockEventMonitorValidator("invalidTestId", "invalidTestName", "down"),
lookslike.MustCompile(common.MapStr{
"error": common.MapStr{
"message": isdef.IsStringContaining("missing required field"),
"type": "io",
},
}),
),
)
}

func testMonitorConfig(t *testing.T, conf *common.Config, eventValidator validator.Validator) {
reg, built, closed := mockPluginsReg()
pipelineConnector := &MockPipelineConnector{}

sched := scheduler.Create(1, monitoring.NewRegistry(), time.Local, nil, false)
defer sched.Stop()

mon, err := newMonitor(serverMonConf, reg, pipelineConnector, sched.Add, nil, false)
mon, err := newMonitor(conf, reg, pipelineConnector, sched.Add, nil, false)
require.NoError(t, err)

mon.Start()
Expand All @@ -56,7 +86,7 @@ func TestMonitor(t *testing.T) {
pcClient.Close()

for _, event := range pcClient.Publishes() {
testslike.Test(t, mockEventMonitorValidator(""), event.Fields)
testslike.Test(t, eventValidator, event.Fields)
}
} else {
// Let's yield this goroutine so we don't spin
Expand Down
5 changes: 2 additions & 3 deletions heartbeat/monitors/stdfields/stdfields.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,9 @@
package stdfields

import (
"fmt"
"time"

"github.com/pkg/errors"

"github.com/elastic/beats/v7/heartbeat/scheduler/schedule"
"github.com/elastic/beats/v7/libbeat/common"
)
Expand All @@ -46,7 +45,7 @@ func ConfigToStdMonitorFields(config *common.Config) (StdMonitorFields, error) {
mpi := StdMonitorFields{Enabled: true}

if err := config.Unpack(&mpi); err != nil {
return mpi, errors.Wrap(err, "error unpacking monitor plugin config")
return mpi, fmt.Errorf("error unpacking monitor plugin config: %w", err)
}

// Use `service_name` if `service.name` is unspecified
Expand Down
2 changes: 1 addition & 1 deletion heartbeat/monitors/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func runPublishJob(job jobs.Job, client *WrappedClient) []scheduler.TaskFunc {

conts, err := job(event)
if err != nil {
logp.Err("Job %v failed with: ", err)
logp.Err("Job failed with: %s", err)
}

hasContinuations := len(conts) > 0
Expand Down
11 changes: 8 additions & 3 deletions heartbeat/tests/system/test_icmp.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import sys
import time
import unittest
import re
from beat.beat import INTEGRATION_TESTS
from elasticsearch import Elasticsearch
from heartbeat import BaseTest
Expand Down Expand Up @@ -44,8 +45,12 @@ def has_failed_message(): return self.log_contains("Failed to initialize ICMP lo
# we should run pings on those machines and make sure they work.
self.wait_until(lambda: has_started_message() or has_failed_message(), 30)

self.wait_until(lambda: self.output_has(lines=1))
output = self.read_output()
monitor_status = output[0]["monitor.status"]
monitor_error = output[0]["error.message"]
if has_failed_message():
proc.check_kill_and_wait(1)
assert monitor_status == "down"
self.assertRegex(monitor_error, ".*Insufficient privileges to perform ICMP ping.*")
else:
# Check that documents are moving through
self.wait_until(lambda: self.output_has(lines=1))
assert monitor_status == "up"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've seen we have the following comment in the code to explain why we have these different assertions, which makes this test really test different things depending on the underlying environment:

We don't know if the system tests are running is configured to support or not support ping, but we can at least check that the ICMP loop was initiated. In the future we should start up VMs with the correct perms configured and be more specific. In addition to that we should run pings on those machines and make sure they work.

However, I wonder whether we should just add a further verification above to check whether we can perform ICMP pings and then we can use that to decide which logs and statuses we'll look for.

Right now we use Heartbeat's logs to assert on its behaviour, while the suggestion above would help us use Python to look into the underlying platform and then assert on Heartbeat's behaviour. In other words, even though it would be a conditional test, its conditions would not depend only on Heartbeat.

This is not a blocking comment btw, and you can consider this to be outside the scope of this PR if you judge it to be unnecessary for now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think it's a good point for a follow-up PR perhaps.

The only to determine if you can do a ping is to actually do one sadly or maybe to use getcap.

The tricky thing from a CI perspective is you need to see if you have permission, somehow, to run sudo setcap cap_net_raw+eip on the heartbeat binary. I think we'd need to execute this from python as well.

We probably need this logic regardless because on most people's laptops the no pings behavior will be the default.

I'm not averse to us reaching out to robots to fix this, but I don't think it's a priority given the ultra-low churn WRT the ICMP code.

1 change: 1 addition & 0 deletions x-pack/heartbeat/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
.synthetics
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🙏