Skip to content

Commit

Permalink
[heartbeat] Set IDs explicitly (#9697)
Browse files Browse the repository at this point in the history
This patch lets you set the `monitor.id` field from a monitor's config.

It also redefines the specific meaning of heartbeat's `monitor.id` field. It also tightens up the definition of the `monitor.name` field in events.
  • Loading branch information
andrewvc authored Jan 7, 2019
1 parent 2d58140 commit 6dd4508
Show file tree
Hide file tree
Showing 27 changed files with 311 additions and 270 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
*Heartbeat*

- Remove monitor generator script that was rarely used. {pull}9648[9648]
- monitor IDs are now configurable. Auto generated monitor IDs now use a different formula based on a hash of their config values. If you wish to have continuity with the old format of monitor IDs you'll need to set the `id` property explicitly. {pull}9697[9697]

*Journalbeat*

Expand Down
2 changes: 1 addition & 1 deletion heartbeat/beater/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func (bt *Heartbeat) RunCentralMgmtMonitors(b *beat.Beat) {
func (bt *Heartbeat) RunReloadableMonitors(b *beat.Beat) (err error) {
// Check monitor configs
if err := bt.monitorReloader.Check(bt.dynamicFactory); err != nil {
return err
logp.Error(errors.Wrap(err, "error loading reloadable monitors"))
}

// Execute the monitor
Expand Down
17 changes: 15 additions & 2 deletions heartbeat/docs/heartbeat-options.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,26 @@ expected response. See <<monitor-http-options>>.
The `tcp` and `http` monitor types both support SSL/TLS and some proxy
settings.

[float]
[[monitor-id]]
==== `id`

A unique identifier for this configuration. This should not change with edits to the monitor configuration
regardless of changes to any config fields. Examples: `uploader-service`, `http://example.net`, `us-west-loadbalancer`. Note that this uniqueness is only within a given beat instance. If you want to monitor the same endpoint from multiple locations it is recommended that those heartbeat instances use the same IDs so that their results can be correlated. You can use the `host.geo.name` property to disambiguate them.

When querying against indexed monitor data this is the field you will be aggregating with. Appears in the
<<exported-fields,exported fields>> as `monitor.id`.

If you do not set this explicitly the monitor's config will be hashed and a generated value used. This value will
change with any options change to this monitor making aggregations over time between changes impossible. For this reason
it is recommended that you set this manually.

[float]
[[monitor-name]]
==== `name`

The monitor name. This value appears in the <<exported-fields,exported fields>>
under the `monitor` field as the job name and the `type` field as the job type.
Optional human readable name for this monitor. This value appears in the <<exported-fields,exported fields>>
as `monitor.name`.

[float]
[[monitor-enabled]]
Expand Down
3 changes: 1 addition & 2 deletions heartbeat/hbtest/hbtestutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,14 +95,13 @@ func TLSChecks(chainIndex, certIndex int, certificate *x509.Certificate) mapval.

// MonitorChecks creates a skima.Validator that represents the "monitor" field present
// in all heartbeat events.
func MonitorChecks(id string, host string, ip string, scheme string, status string) mapval.Validator {
func MonitorChecks(host string, ip string, scheme string, status string) mapval.Validator {
return mapval.MustCompile(mapval.Map{
"monitor": mapval.Map{
// TODO: This is only optional because, for some reason, TCP returns
// this value, but HTTP does not. We should fix this
"host": mapval.Optional(mapval.IsEqual(host)),
"duration.us": mapval.IsDuration,
"id": id,
"ip": ip,
"scheme": scheme,
"status": status,
Expand Down
22 changes: 5 additions & 17 deletions heartbeat/monitors/active/dialchain/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package dialchain

import (
"fmt"
"net"
"strconv"
"time"
Expand Down Expand Up @@ -130,14 +129,14 @@ func (b *Builder) Run(
// correctly resolved endpoint.
func MakeDialerJobs(
b *Builder,
typ, scheme string,
scheme string,
endpoints []Endpoint,
mode monitors.IPSettings,
fn func(event *beat.Event, dialer transport.Dialer, addr string) error,
) ([]monitors.Job, error) {
var jobs []monitors.Job
for _, endpoint := range endpoints {
endpointJobs, err := makeEndpointJobs(b, typ, scheme, endpoint, mode, fn)
endpointJobs, err := makeEndpointJobs(b, scheme, endpoint, mode, fn)
if err != nil {
return nil, err
}
Expand All @@ -149,7 +148,7 @@ func MakeDialerJobs(

func makeEndpointJobs(
b *Builder,
typ, scheme string,
scheme string,
endpoint Endpoint,
mode monitors.IPSettings,
fn func(*beat.Event, transport.Dialer, string) error,
Expand Down Expand Up @@ -180,8 +179,7 @@ func makeEndpointJobs(

// Create job that first resolves one or multiple IP (depending on
// config.Mode) in order to create one continuation Task per IP.
jobID := jobID(typ, scheme, endpoint.Host, endpoint.Ports)
settings := monitors.MakeHostJobSettings(jobID, endpoint.Host, mode)
settings := monitors.MakeHostJobSettings(endpoint.Host, mode)

job, err := monitors.MakeByHostJob(settings,
monitors.MakePingAllIPPortFactory(endpoint.Ports,
Expand All @@ -199,15 +197,5 @@ func makeEndpointJobs(
if err != nil {
return nil, err
}
return []monitors.Job{monitors.WithJobId(jobID, monitors.WithFields(fields, job))}, nil
}

func jobID(typ, jobType, host string, ports []uint16) string {
var h string
if len(ports) == 1 {
h = fmt.Sprintf("%v:%v", host, ports[0])
} else {
h = fmt.Sprintf("%v:%v", host, ports)
}
return fmt.Sprintf("%v-%v@%v", typ, jobType, h)
return []monitors.Job{monitors.WithFields(fields, job)}, nil
}
3 changes: 0 additions & 3 deletions heartbeat/monitors/active/http/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@ import (
)

type Config struct {
Name string `config:"name"`

URLs []string `config:"urls" validate:"required"`
ProxyURL string `config:"proxy_url"`
Timeout time.Duration `config:"timeout"`
Expand Down Expand Up @@ -87,7 +85,6 @@ type compressionConfig struct {
}

var defaultConfig = Config{
Name: "http",
Timeout: 16 * time.Second,
MaxRedirects: 10,
Mode: monitors.DefaultIPSettings,
Expand Down
16 changes: 8 additions & 8 deletions heartbeat/monitors/active/http/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func testTLSRequest(t *testing.T, testURL string, extraConfig map[string]interfa
job := jobs[0]

event := &beat.Event{}
_, err = job.Run(event)
_, err = job(event)
require.NoError(t, err)

require.Equal(t, 1, endpoints)
Expand Down Expand Up @@ -188,7 +188,7 @@ func TestUpStatuses(t *testing.T) {
mapvaltest.Test(
t,
mapval.Strict(mapval.Compose(
hbtest.MonitorChecks("http@"+server.URL, server.URL, "127.0.0.1", "http", "up"),
hbtest.MonitorChecks(server.URL, "127.0.0.1", "http", "up"),
hbtest.RespondingTCPChecks(port),
respondingHTTPChecks(server.URL, status),
)),
Expand All @@ -209,7 +209,7 @@ func TestDownStatuses(t *testing.T) {
mapvaltest.Test(
t,
mapval.Strict(mapval.Compose(
hbtest.MonitorChecks("http@"+server.URL, server.URL, "127.0.0.1", "http", "down"),
hbtest.MonitorChecks(server.URL, "127.0.0.1", "http", "down"),
hbtest.RespondingTCPChecks(port),
respondingHTTPChecks(server.URL, status),
hbtest.ErrorChecks(fmt.Sprintf("%d", status), "validate"),
Expand Down Expand Up @@ -239,7 +239,7 @@ func TestLargeResponse(t *testing.T) {
job := jobs[0]

event := &beat.Event{}
_, err = job.Run(event)
_, err = job(event)
require.NoError(t, err)

port, err := hbtest.ServerPort(server)
Expand All @@ -248,7 +248,7 @@ func TestLargeResponse(t *testing.T) {
mapvaltest.Test(
t,
mapval.Strict(mapval.Compose(
hbtest.MonitorChecks("http@"+server.URL, server.URL, "127.0.0.1", "http", "up"),
hbtest.MonitorChecks(server.URL, "127.0.0.1", "http", "up"),
hbtest.RespondingTCPChecks(port),
respondingHTTPChecks(server.URL, 200),
)),
Expand Down Expand Up @@ -282,7 +282,7 @@ func runHTTPSServerCheck(
mapvaltest.Test(
t,
mapval.Strict(mapval.Compose(
hbtest.MonitorChecks("http@"+server.URL, server.URL, "127.0.0.1", "https", "up"),
hbtest.MonitorChecks(server.URL, "127.0.0.1", "https", "up"),
hbtest.RespondingTCPChecks(port),
hbtest.TLSChecks(0, 0, cert),
respondingHTTPChecks(server.URL, http.StatusOK),
Expand Down Expand Up @@ -347,7 +347,7 @@ func TestConnRefusedJob(t *testing.T) {
mapvaltest.Test(
t,
mapval.Strict(mapval.Compose(
hbtest.MonitorChecks("http@"+url, url, ip, "http", "down"),
hbtest.MonitorChecks(url, ip, "http", "down"),
hbtest.TCPBaseChecks(port),
hbtest.ErrorChecks(url, "io"),
httpBaseChecks(url),
Expand All @@ -369,7 +369,7 @@ func TestUnreachableJob(t *testing.T) {
mapvaltest.Test(
t,
mapval.Strict(mapval.Compose(
hbtest.MonitorChecks("http@"+url, url, ip, "http", "down"),
hbtest.MonitorChecks(url, ip, "http", "down"),
hbtest.TCPBaseChecks(uint16(port)),
hbtest.ErrorChecks(url, "io"),
httpBaseChecks(url),
Expand Down
43 changes: 19 additions & 24 deletions heartbeat/monitors/active/http/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ func newHTTPMonitorHostJob(
body []byte,
validator RespCheck,
) (monitors.Job, error) {
typ := config.Name

client := &http.Client{
CheckRedirect: makeCheckRedirect(config.MaxRedirects),
Expand All @@ -66,26 +65,24 @@ func newHTTPMonitorHostJob(

timeout := config.Timeout

id := fmt.Sprintf("%v@%v", typ, addr)
return monitors.WithJobId(id,
monitors.WithFields(
common.MapStr{
"monitor": common.MapStr{
"scheme": request.URL.Scheme,
"host": hostname,
},
"http": common.MapStr{
"url": request.URL.String(),
},
"tcp": common.MapStr{
"port": port,
},
return monitors.WithFields(
common.MapStr{
"monitor": common.MapStr{
"scheme": request.URL.Scheme,
"host": hostname,
},
monitors.MakeSimpleJob(func(event *beat.Event) error {
_, _, err := execPing(event, client, request, body, timeout, validator)
return err
}),
)), nil
"http": common.MapStr{
"url": request.URL.String(),
},
"tcp": common.MapStr{
"port": port,
},
},
monitors.MakeSimpleJob(func(event *beat.Event) error {
_, _, err := execPing(event, client, request, body, timeout, validator)
return err
}),
), nil
}

func newHTTPMonitorIPsJob(
Expand All @@ -96,7 +93,6 @@ func newHTTPMonitorIPsJob(
body []byte,
validator RespCheck,
) (monitors.Job, error) {
typ := config.Name

req, err := buildRequest(addr, config, enc)
if err != nil {
Expand All @@ -108,8 +104,7 @@ func newHTTPMonitorIPsJob(
return nil, err
}

id := fmt.Sprintf("%v@%v", typ, addr)
settings := monitors.MakeHostJobSettings(id, hostname, config.Mode)
settings := monitors.MakeHostJobSettings(hostname, config.Mode)

pingFactory := createPingFactory(config, port, tls, req, body, validator)
job, err := monitors.MakeByHostJob(settings, pingFactory)
Expand All @@ -126,7 +121,7 @@ func newHTTPMonitorIPsJob(
},
}

return monitors.WithJobId(id, monitors.WithFields(fields, job)), err
return monitors.WithFields(fields, job), err
}

func createPingFactory(
Expand Down
3 changes: 0 additions & 3 deletions heartbeat/monitors/active/icmp/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ import (
)

type Config struct {
Name string `config:"name"`

Hosts []string `config:"hosts" validate:"required"`
Mode monitors.IPSettings `config:",inline"`

Expand All @@ -34,7 +32,6 @@ type Config struct {
}

var DefaultConfig = Config{
Name: "icmp",
Mode: monitors.DefaultIPSettings,

Timeout: 16 * time.Second,
Expand Down
8 changes: 1 addition & 7 deletions heartbeat/monitors/active/icmp/icmp.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,16 +78,10 @@ func create(
return nil, 0, err
}

network := config.Mode.Network()
pingFactory := monitors.MakePingIPFactory(createPingIPFactory(&config))

for _, host := range config.Hosts {
jobName := fmt.Sprintf("icmp-%v-host-%v@%v", config.Name, network, host)
if ip := net.ParseIP(host); ip != nil {
jobName = fmt.Sprintf("icmp-%v-ip@%v", config.Name, ip.String())
}

settings := monitors.MakeHostJobSettings(jobName, host, config.Mode)
settings := monitors.MakeHostJobSettings(host, config.Mode)
err := addJob(monitors.MakeByHostJob(settings, pingFactory))
if err != nil {
return nil, 0, err
Expand Down
3 changes: 0 additions & 3 deletions heartbeat/monitors/active/tcp/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ import (
)

type Config struct {
Name string `config:"name"`

// check all ports if host does not contain port
Hosts []string `config:"hosts" validate:"required"`
Ports []uint16 `config:"ports"`
Expand All @@ -49,7 +47,6 @@ type Config struct {
}

var DefaultConfig = Config{
Name: "tcp",
Timeout: 16 * time.Second,
Mode: monitors.DefaultIPSettings,
}
Expand Down
3 changes: 1 addition & 2 deletions heartbeat/monitors/active/tcp/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ func create(
return nil, 0, err
}

typ := config.Name
timeout := config.Timeout
validator := makeValidateConn(&config)

Expand All @@ -87,7 +86,7 @@ func create(
return nil, 0, err
}

epJobs, err := dialchain.MakeDialerJobs(db, typ, scheme, eps, config.Mode,
epJobs, err := dialchain.MakeDialerJobs(db, scheme, eps, config.Mode,
func(event *beat.Event, dialer transport.Dialer, addr string) error {
return pingHost(event, dialer, addr, timeout, validator)
})
Expand Down
Loading

0 comments on commit 6dd4508

Please sign in to comment.