Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[heartbeat] Set IDs explicitly #9697

Merged
merged 8 commits into from
Jan 7, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
*Heartbeat*

- Remove monitor generator script that was rarely used. {pull}9648[9648]
- monitor IDs are now configurable. Auto generated monitor IDs now use a different formula based on a hash of their config values. If you wish to have continuity with the old format of monitor IDs you'll need to set the `id` property explicitly. {pull}9697[9697]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should also the change to the name field be mentioned here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need, its behavior hasn't really changed. What has changed is our semantic interpretation of it for the UI, which no one is using yet, so I think that's not worth discussing.


*Journalbeat*

Expand Down
2 changes: 1 addition & 1 deletion heartbeat/beater/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func (bt *Heartbeat) RunCentralMgmtMonitors(b *beat.Beat) {
func (bt *Heartbeat) RunReloadableMonitors(b *beat.Beat) (err error) {
// Check monitor configs
if err := bt.monitorReloader.Check(bt.dynamicFactory); err != nil {
return err
logp.Error(errors.Wrap(err, "error loading reloadable monitors"))
}

// Execute the monitor
Expand Down
17 changes: 15 additions & 2 deletions heartbeat/docs/heartbeat-options.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,26 @@ expected response. See <<monitor-http-options>>.
The `tcp` and `http` monitor types both support SSL/TLS and some proxy
settings.

[float]
[[monitor-id]]
==== `id`

A unique identifier for this configuration. This should not change with edits to the monitor configuration
regardless of changes to any config fields. Examples: `uploader-service`, `http://example.net`, `us-west-loadbalancer`. Note that this uniqueness is only within a given beat instance. If you want to monitor the same endpoint from multiple locations it is recommended that those heartbeat instances use the same IDs so that their results can be correlated. You can use the `host.geo.name` property to disambiguate them.

When querying against indexed monitor data this is the field you will be aggregating with. Appears in the
<<exported-fields,exported fields>> as `monitor.id`.

If you do not set this explicitly the monitor's config will be hashed and a generated value used. This value will
change with any options change to this monitor making aggregations over time between changes impossible. For this reason
it is recommended that you set this manually.
andrewvc marked this conversation as resolved.
Show resolved Hide resolved

[float]
[[monitor-name]]
==== `name`

The monitor name. This value appears in the <<exported-fields,exported fields>>
under the `monitor` field as the job name and the `type` field as the job type.
Optional human readable name for this monitor. This value appears in the <<exported-fields,exported fields>>
as `monitor.name`.

[float]
[[monitor-enabled]]
Expand Down
3 changes: 1 addition & 2 deletions heartbeat/hbtest/hbtestutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,14 +95,13 @@ func TLSChecks(chainIndex, certIndex int, certificate *x509.Certificate) mapval.

// MonitorChecks creates a skima.Validator that represents the "monitor" field present
// in all heartbeat events.
func MonitorChecks(id string, host string, ip string, scheme string, status string) mapval.Validator {
func MonitorChecks(host string, ip string, scheme string, status string) mapval.Validator {
return mapval.MustCompile(mapval.Map{
"monitor": mapval.Map{
// TODO: This is only optional because, for some reason, TCP returns
// this value, but HTTP does not. We should fix this
"host": mapval.Optional(mapval.IsEqual(host)),
"duration.us": mapval.IsDuration,
"id": id,
"ip": ip,
"scheme": scheme,
"status": status,
Expand Down
22 changes: 5 additions & 17 deletions heartbeat/monitors/active/dialchain/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package dialchain

import (
"fmt"
"net"
"strconv"
"time"
Expand Down Expand Up @@ -130,14 +129,14 @@ func (b *Builder) Run(
// correctly resolved endpoint.
func MakeDialerJobs(
b *Builder,
typ, scheme string,
scheme string,
endpoints []Endpoint,
mode monitors.IPSettings,
fn func(event *beat.Event, dialer transport.Dialer, addr string) error,
) ([]monitors.Job, error) {
var jobs []monitors.Job
for _, endpoint := range endpoints {
endpointJobs, err := makeEndpointJobs(b, typ, scheme, endpoint, mode, fn)
endpointJobs, err := makeEndpointJobs(b, scheme, endpoint, mode, fn)
if err != nil {
return nil, err
}
Expand All @@ -149,7 +148,7 @@ func MakeDialerJobs(

func makeEndpointJobs(
b *Builder,
typ, scheme string,
scheme string,
endpoint Endpoint,
mode monitors.IPSettings,
fn func(*beat.Event, transport.Dialer, string) error,
Expand Down Expand Up @@ -180,8 +179,7 @@ func makeEndpointJobs(

// Create job that first resolves one or multiple IP (depending on
// config.Mode) in order to create one continuation Task per IP.
jobID := jobID(typ, scheme, endpoint.Host, endpoint.Ports)
settings := monitors.MakeHostJobSettings(jobID, endpoint.Host, mode)
settings := monitors.MakeHostJobSettings(endpoint.Host, mode)

job, err := monitors.MakeByHostJob(settings,
monitors.MakePingAllIPPortFactory(endpoint.Ports,
Expand All @@ -199,15 +197,5 @@ func makeEndpointJobs(
if err != nil {
return nil, err
}
return []monitors.Job{monitors.WithJobId(jobID, monitors.WithFields(fields, job))}, nil
}

func jobID(typ, jobType, host string, ports []uint16) string {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see there was some value in the old id. I wonder if this is still something we can reconstruct on the UI side as all the data should be available there if needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, exactly. I don't think we're missing it inside of heartbeat itself.

var h string
if len(ports) == 1 {
h = fmt.Sprintf("%v:%v", host, ports[0])
} else {
h = fmt.Sprintf("%v:%v", host, ports)
}
return fmt.Sprintf("%v-%v@%v", typ, jobType, h)
return []monitors.Job{monitors.WithFields(fields, job)}, nil
}
3 changes: 0 additions & 3 deletions heartbeat/monitors/active/http/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@ import (
)

type Config struct {
Name string `config:"name"`

URLs []string `config:"urls" validate:"required"`
ProxyURL string `config:"proxy_url"`
Timeout time.Duration `config:"timeout"`
Expand Down Expand Up @@ -87,7 +85,6 @@ type compressionConfig struct {
}

var defaultConfig = Config{
Name: "http",
Timeout: 16 * time.Second,
MaxRedirects: 10,
Mode: monitors.DefaultIPSettings,
Expand Down
16 changes: 8 additions & 8 deletions heartbeat/monitors/active/http/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func testTLSRequest(t *testing.T, testURL string, extraConfig map[string]interfa
job := jobs[0]

event := &beat.Event{}
_, err = job.Run(event)
_, err = job(event)
require.NoError(t, err)

require.Equal(t, 1, endpoints)
Expand Down Expand Up @@ -188,7 +188,7 @@ func TestUpStatuses(t *testing.T) {
mapvaltest.Test(
t,
mapval.Strict(mapval.Compose(
hbtest.MonitorChecks("http@"+server.URL, server.URL, "127.0.0.1", "http", "up"),
hbtest.MonitorChecks(server.URL, "127.0.0.1", "http", "up"),
hbtest.RespondingTCPChecks(port),
respondingHTTPChecks(server.URL, status),
)),
Expand All @@ -209,7 +209,7 @@ func TestDownStatuses(t *testing.T) {
mapvaltest.Test(
t,
mapval.Strict(mapval.Compose(
hbtest.MonitorChecks("http@"+server.URL, server.URL, "127.0.0.1", "http", "down"),
hbtest.MonitorChecks(server.URL, "127.0.0.1", "http", "down"),
hbtest.RespondingTCPChecks(port),
respondingHTTPChecks(server.URL, status),
hbtest.ErrorChecks(fmt.Sprintf("%d", status), "validate"),
Expand Down Expand Up @@ -239,7 +239,7 @@ func TestLargeResponse(t *testing.T) {
job := jobs[0]

event := &beat.Event{}
_, err = job.Run(event)
_, err = job(event)
require.NoError(t, err)

port, err := hbtest.ServerPort(server)
Expand All @@ -248,7 +248,7 @@ func TestLargeResponse(t *testing.T) {
mapvaltest.Test(
t,
mapval.Strict(mapval.Compose(
hbtest.MonitorChecks("http@"+server.URL, server.URL, "127.0.0.1", "http", "up"),
hbtest.MonitorChecks(server.URL, "127.0.0.1", "http", "up"),
hbtest.RespondingTCPChecks(port),
respondingHTTPChecks(server.URL, 200),
)),
Expand Down Expand Up @@ -282,7 +282,7 @@ func runHTTPSServerCheck(
mapvaltest.Test(
t,
mapval.Strict(mapval.Compose(
hbtest.MonitorChecks("http@"+server.URL, server.URL, "127.0.0.1", "https", "up"),
hbtest.MonitorChecks(server.URL, "127.0.0.1", "https", "up"),
hbtest.RespondingTCPChecks(port),
hbtest.TLSChecks(0, 0, cert),
respondingHTTPChecks(server.URL, http.StatusOK),
Expand Down Expand Up @@ -347,7 +347,7 @@ func TestConnRefusedJob(t *testing.T) {
mapvaltest.Test(
t,
mapval.Strict(mapval.Compose(
hbtest.MonitorChecks("http@"+url, url, ip, "http", "down"),
hbtest.MonitorChecks(url, ip, "http", "down"),
hbtest.TCPBaseChecks(port),
hbtest.ErrorChecks(url, "io"),
httpBaseChecks(url),
Expand All @@ -369,7 +369,7 @@ func TestUnreachableJob(t *testing.T) {
mapvaltest.Test(
t,
mapval.Strict(mapval.Compose(
hbtest.MonitorChecks("http@"+url, url, ip, "http", "down"),
hbtest.MonitorChecks(url, ip, "http", "down"),
hbtest.TCPBaseChecks(uint16(port)),
hbtest.ErrorChecks(url, "io"),
httpBaseChecks(url),
Expand Down
43 changes: 19 additions & 24 deletions heartbeat/monitors/active/http/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ func newHTTPMonitorHostJob(
body []byte,
validator RespCheck,
) (monitors.Job, error) {
typ := config.Name

client := &http.Client{
CheckRedirect: makeCheckRedirect(config.MaxRedirects),
Expand All @@ -66,26 +65,24 @@ func newHTTPMonitorHostJob(

timeout := config.Timeout

id := fmt.Sprintf("%v@%v", typ, addr)
return monitors.WithJobId(id,
monitors.WithFields(
common.MapStr{
"monitor": common.MapStr{
"scheme": request.URL.Scheme,
"host": hostname,
},
"http": common.MapStr{
"url": request.URL.String(),
},
"tcp": common.MapStr{
"port": port,
},
return monitors.WithFields(
common.MapStr{
"monitor": common.MapStr{
"scheme": request.URL.Scheme,
"host": hostname,
},
monitors.MakeSimpleJob(func(event *beat.Event) error {
_, _, err := execPing(event, client, request, body, timeout, validator)
return err
}),
)), nil
"http": common.MapStr{
"url": request.URL.String(),
},
"tcp": common.MapStr{
"port": port,
},
},
monitors.MakeSimpleJob(func(event *beat.Event) error {
_, _, err := execPing(event, client, request, body, timeout, validator)
return err
}),
), nil
}

func newHTTPMonitorIPsJob(
Expand All @@ -96,7 +93,6 @@ func newHTTPMonitorIPsJob(
body []byte,
validator RespCheck,
) (monitors.Job, error) {
typ := config.Name

req, err := buildRequest(addr, config, enc)
if err != nil {
Expand All @@ -108,8 +104,7 @@ func newHTTPMonitorIPsJob(
return nil, err
}

id := fmt.Sprintf("%v@%v", typ, addr)
settings := monitors.MakeHostJobSettings(id, hostname, config.Mode)
settings := monitors.MakeHostJobSettings(hostname, config.Mode)

pingFactory := createPingFactory(config, port, tls, req, body, validator)
job, err := monitors.MakeByHostJob(settings, pingFactory)
Expand All @@ -126,7 +121,7 @@ func newHTTPMonitorIPsJob(
},
}

return monitors.WithJobId(id, monitors.WithFields(fields, job)), err
return monitors.WithFields(fields, job), err
}

func createPingFactory(
Expand Down
3 changes: 0 additions & 3 deletions heartbeat/monitors/active/icmp/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ import (
)

type Config struct {
Name string `config:"name"`

Hosts []string `config:"hosts" validate:"required"`
Mode monitors.IPSettings `config:",inline"`

Expand All @@ -34,7 +32,6 @@ type Config struct {
}

var DefaultConfig = Config{
Name: "icmp",
Mode: monitors.DefaultIPSettings,

Timeout: 16 * time.Second,
Expand Down
8 changes: 1 addition & 7 deletions heartbeat/monitors/active/icmp/icmp.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,16 +78,10 @@ func create(
return nil, 0, err
}

network := config.Mode.Network()
pingFactory := monitors.MakePingIPFactory(createPingIPFactory(&config))

for _, host := range config.Hosts {
jobName := fmt.Sprintf("icmp-%v-host-%v@%v", config.Name, network, host)
if ip := net.ParseIP(host); ip != nil {
jobName = fmt.Sprintf("icmp-%v-ip@%v", config.Name, ip.String())
}

settings := monitors.MakeHostJobSettings(jobName, host, config.Mode)
settings := monitors.MakeHostJobSettings(host, config.Mode)
err := addJob(monitors.MakeByHostJob(settings, pingFactory))
if err != nil {
return nil, 0, err
Expand Down
3 changes: 0 additions & 3 deletions heartbeat/monitors/active/tcp/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ import (
)

type Config struct {
Name string `config:"name"`

// check all ports if host does not contain port
Hosts []string `config:"hosts" validate:"required"`
Ports []uint16 `config:"ports"`
Expand All @@ -49,7 +47,6 @@ type Config struct {
}

var DefaultConfig = Config{
Name: "tcp",
Timeout: 16 * time.Second,
Mode: monitors.DefaultIPSettings,
}
Expand Down
3 changes: 1 addition & 2 deletions heartbeat/monitors/active/tcp/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ func create(
return nil, 0, err
}

typ := config.Name
timeout := config.Timeout
validator := makeValidateConn(&config)

Expand All @@ -87,7 +86,7 @@ func create(
return nil, 0, err
}

epJobs, err := dialchain.MakeDialerJobs(db, typ, scheme, eps, config.Mode,
epJobs, err := dialchain.MakeDialerJobs(db, scheme, eps, config.Mode,
func(event *beat.Event, dialer transport.Dialer, addr string) error {
return pingHost(event, dialer, addr, timeout, validator)
})
Expand Down
Loading