Skip to content

Commit

Permalink
Support "cluster" scope in Metricbeat elasticsearch module (#18547) (#…
Browse files Browse the repository at this point in the history
…20413)

* Adding configuration for hosts_mode

* Only perform master check in HostsModeNode

* Only ask the node if it's the master node if we're in HostsModeNode

* Unpack host_mode string into enum

* Adding some specific TODOs in node_stats code

* Updating x-pack/metricbeat reference config

* Set correct service URI

* Get master node ID

* Adding CHANGELOG entry

* Rename hosts_mode => scope

* Removing stale TODO comment

* Adding docs

* Refactoring common code into helper method

* Do not set service URI up front

* Updating documentation per review

* Remove comments from doc examples

* Adding configuration for hosts_mode

* Set correct service URI

* Adding CHANGELOG entry

* Rename hosts_mode => scope

* Do not set service URI up front

* Update metricbeat/docs/modules/elasticsearch.asciidoc

Co-authored-by: DeDe Morton <[email protected]>

* Update metricbeat/module/elasticsearch/_meta/docs.asciidoc

Co-authored-by: DeDe Morton <[email protected]>

* Update reference config

* Cleaning up CHANGELOG

* Updating generated files

Co-authored-by: DeDe Morton <[email protected]>

Co-authored-by: DeDe Morton <[email protected]>
  • Loading branch information
ycombinator and dedemorton authored Aug 4, 2020
1 parent 1713a9a commit 903750c
Show file tree
Hide file tree
Showing 20 changed files with 222 additions and 97 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -696,6 +696,7 @@ field. You can revert this change by configuring tags for the module and omittin
- Added cache and connection_errors metrics to status metricset of MySQL module {issue}16955[16955] {pull}19844[19844]
- Update MySQL dashboard with connection errors and cache metrics {pull}19913[19913] {issue}16955[16955]
- Add cloud.instance.name into aws ec2 metricset. {pull}20077[20077]
- Add `scope` setting for elasticsearch module, allowing it to monitor an Elasticsearch cluster behind a load-balancing proxy. {issue}18539[18539] {pull}18547[18547]

*Packetbeat*

Expand Down
16 changes: 11 additions & 5 deletions metricbeat/docs/modules/elasticsearch.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,16 @@ The `elasticsearch` module collects metrics about {es}.
The `elasticsearch` module works with {es} 6.7.0 and later.

[float]
=== Usage for Stack Monitoring
=== Module-specific configuration notes

Like other {beatname_uc} modules, the `elasticsearch` module accepts a `hosts` configuration setting.
This setting can contain a list of entries. The related `scope` setting determines how each entry in
the `hosts` list is interpreted by the module.

* If `scope` is set to `node` (default), each entry in the `hosts` list indicates a distinct node in an
{es} cluster.
* If `scope` is set to `cluster`, each entry in the `hosts` list indicates a single endpoint for a distinct
{es} cluster (for example, a load-balancing proxy fronting the cluster).

The `elasticsearch` module can be used to collect metrics shown in our {stack} {monitor-features}
UI in {kib}. To enable this usage, set `xpack.enabled: true` and remove any `metricsets`
Expand Down Expand Up @@ -45,12 +54,9 @@ metricbeat.modules:
#password: "changeme"
#ssl.certificate_authorities: ["/etc/pki/root/ca.pem"]
# Set to false to fetch all entries
#index_recovery.active_only: true
# Set to true to send data collected by module to X-Pack
# Monitoring instead of metricbeat-* indices.
#xpack.enabled: false
#scope: node
----

This module supports TLS connections when using `ssl` config field, as described in <<configuration-ssl>>.
Expand Down
5 changes: 1 addition & 4 deletions metricbeat/metricbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -273,12 +273,9 @@ metricbeat.modules:
#password: "changeme"
#ssl.certificate_authorities: ["/etc/pki/root/ca.pem"]

# Set to false to fetch all entries
#index_recovery.active_only: true

# Set to true to send data collected by module to X-Pack
# Monitoring instead of metricbeat-* indices.
#xpack.enabled: false
#scope: node

#------------------------------ Envoyproxy Module ------------------------------
- module: envoyproxy
Expand Down
5 changes: 1 addition & 4 deletions metricbeat/module/elasticsearch/_meta/config.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,6 @@
#password: "changeme"
#ssl.certificate_authorities: ["/etc/pki/root/ca.pem"]

# Set to false to fetch all entries
#index_recovery.active_only: true

# Set to true to send data collected by module to X-Pack
# Monitoring instead of metricbeat-* indices.
#xpack.enabled: false
#scope: node
11 changes: 10 additions & 1 deletion metricbeat/module/elasticsearch/_meta/docs.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,16 @@ The `elasticsearch` module collects metrics about {es}.
The `elasticsearch` module works with {es} 6.7.0 and later.

[float]
=== Usage for Stack Monitoring
=== Module-specific configuration notes

Like other {beatname_uc} modules, the `elasticsearch` module accepts a `hosts` configuration setting.
This setting can contain a list of entries. The related `scope` setting determines how each entry in
the `hosts` list is interpreted by the module.

* If `scope` is set to `node` (default), each entry in the `hosts` list indicates a distinct node in an
{es} cluster.
* If `scope` is set to `cluster`, each entry in the `hosts` list indicates a single endpoint for a distinct
{es} cluster (for example, a load-balancing proxy fronting the cluster).

The `elasticsearch` module can be used to collect metrics shown in our {stack} {monitor-features}
UI in {kib}. To enable this usage, set `xpack.enabled: true` and remove any `metricsets`
Expand Down
9 changes: 3 additions & 6 deletions metricbeat/module/elasticsearch/ccr/ccr.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,11 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {

// Fetch gathers stats for each follower shard from the _ccr/stats API
func (m *MetricSet) Fetch(r mb.ReporterV2) error {
isMaster, err := elasticsearch.IsMaster(m.HTTP, m.GetServiceURI())
shouldSkip, err := m.ShouldSkipFetch()
if err != nil {
return errors.Wrap(err, "error determining if connected Elasticsearch node is master")
return err
}

// Not master, no event sent
if !isMaster {
m.Logger().Debug("trying to fetch ccr stats from a non-master node")
if shouldSkip {
return nil
}

Expand Down
11 changes: 3 additions & 8 deletions metricbeat/module/elasticsearch/cluster_stats/cluster_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
package cluster_stats

import (
"github.com/pkg/errors"

"github.com/elastic/beats/v7/metricbeat/mb"
"github.com/elastic/beats/v7/metricbeat/module/elasticsearch"
)
Expand Down Expand Up @@ -51,14 +49,11 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {

// Fetch methods implements the data gathering and data conversion to the right format
func (m *MetricSet) Fetch(r mb.ReporterV2) error {
isMaster, err := elasticsearch.IsMaster(m.HTTP, m.HostData().SanitizedURI+clusterStatsPath)
shouldSkip, err := m.ShouldSkipFetch()
if err != nil {
return errors.Wrap(err, "error determining if connected Elasticsearch node is master")
return err
}

// Not master, no event sent
if !isMaster {
m.Logger().Debug("trying to fetch cluster stats from a non-master node")
if shouldSkip {
return nil
}

Expand Down
22 changes: 22 additions & 0 deletions metricbeat/module/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,28 @@ func IsMLockAllEnabled(http *helper.HTTP, resetURI, nodeID string) (bool, error)
return false, fmt.Errorf("could not determine if mlockall is enabled on node ID = %v", nodeID)
}

// GetMasterNodeID returns the ID of the Elasticsearch cluster's master node
func GetMasterNodeID(http *helper.HTTP, resetURI string) (string, error) {
content, err := fetchPath(http, resetURI, "_nodes/_master", "filter_path=nodes.*.name")
if err != nil {
return "", err
}

var response struct {
Nodes map[string]interface{} `json:"nodes"`
}

if err := json.Unmarshal(content, &response); err != nil {
return "", err
}

for nodeID, _ := range response.Nodes {
return nodeID, nil
}

return "", errors.New("could not determine master node ID")
}

// PassThruField copies the field at the given path from the given source data object into
// the same path in the given target data object.
func PassThruField(fieldPath string, sourceData, targetData common.MapStr) error {
Expand Down
9 changes: 3 additions & 6 deletions metricbeat/module/elasticsearch/enrich/enrich.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,11 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {

// Fetch gathers stats for each enrich coordinator node
func (m *MetricSet) Fetch(r mb.ReporterV2) error {
isMaster, err := elasticsearch.IsMaster(m.HTTP, m.GetServiceURI())
shouldSkip, err := m.ShouldSkipFetch()
if err != nil {
return errors.Wrap(err, "error determining if connected Elasticsearch node is master")
return err
}

// Not master, no event sent
if !isMaster {
m.Logger().Debug("trying to fetch enrich stats from a non-master node")
if shouldSkip {
return nil
}

Expand Down
10 changes: 3 additions & 7 deletions metricbeat/module/elasticsearch/index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,11 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {

// Fetch gathers stats for each index from the _stats API
func (m *MetricSet) Fetch(r mb.ReporterV2) error {

isMaster, err := elasticsearch.IsMaster(m.HTTP, m.HostData().SanitizedURI+statsPath)
shouldSkip, err := m.ShouldSkipFetch()
if err != nil {
return errors.Wrap(err, "error determining if connected Elasticsearch node is master")
return err
}

// Not master, no event sent
if !isMaster {
m.Logger().Debug("trying to fetch index stats from a non-master node")
if shouldSkip {
return nil
}

Expand Down
11 changes: 3 additions & 8 deletions metricbeat/module/elasticsearch/index_recovery/index_recovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
package index_recovery

import (
"github.com/pkg/errors"

"github.com/elastic/beats/v7/metricbeat/mb"
"github.com/elastic/beats/v7/metricbeat/module/elasticsearch"
)
Expand Down Expand Up @@ -67,14 +65,11 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {

// Fetch gathers stats for each index from the _stats API
func (m *MetricSet) Fetch(r mb.ReporterV2) error {
isMaster, err := elasticsearch.IsMaster(m.HTTP, m.GetServiceURI())
shouldSkip, err := m.ShouldSkipFetch()
if err != nil {
return errors.Wrap(err, "error determining if connected Elasticsearch node is master")
return err
}

// Not master, no event sent
if !isMaster {
m.Logger().Debug("trying to fetch index recovery stats from a non-master node")
if shouldSkip {
return nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,11 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {

// Fetch gathers stats for each index from the _stats API
func (m *MetricSet) Fetch(r mb.ReporterV2) error {
isMaster, err := elasticsearch.IsMaster(m.HTTP, m.HostData().SanitizedURI+statsPath)
shouldSkip, err := m.ShouldSkipFetch()
if err != nil {
return errors.Wrap(err, "error determining if connected Elasticsearch node is master")
return err
}

// Not master, no event sent
if !isMaster {
m.Logger().Debug("trying to fetch index summary stats from a non-master node")
if shouldSkip {
return nil
}

Expand Down
54 changes: 53 additions & 1 deletion metricbeat/module/elasticsearch/metricset.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@
package elasticsearch

import (
"fmt"

"github.com/pkg/errors"

"github.com/elastic/beats/v7/metricbeat/helper"
"github.com/elastic/beats/v7/metricbeat/mb"
"github.com/elastic/beats/v7/metricbeat/mb/parse"
Expand All @@ -36,13 +40,39 @@ var (
}.Build()
)

type Scope int

const (
// Indicates that each item in the hosts list points to a distinct Elasticsearch node in a
// cluster.
ScopeNode Scope = iota

// Indicates that each item in the hosts lists points to a endpoint for a distinct Elasticsearch
// cluster (e.g. a load-balancing proxy) fronting the cluster.
ScopeCluster
)

func (h *Scope) Unpack(str string) error {
switch str {
case "node":
*h = ScopeNode
case "cluster":
*h = ScopeCluster
default:
return fmt.Errorf("invalid scope: %v", str)
}

return nil
}

// MetricSet can be used to build other metric sets that query RabbitMQ
// management plugin
type MetricSet struct {
mb.BaseMetricSet
servicePath string
*helper.HTTP
XPack bool
Scope Scope
}

// NewMetricSet creates an metric set that can be used to build other metric
Expand All @@ -54,9 +84,11 @@ func NewMetricSet(base mb.BaseMetricSet, servicePath string) (*MetricSet, error)
}

config := struct {
XPack bool `config:"xpack.enabled"`
XPack bool `config:"xpack.enabled"`
Scope Scope `config:"scope"`
}{
XPack: false,
Scope: ScopeNode,
}
if err := base.Module().UnpackConfig(&config); err != nil {
return nil, err
Expand All @@ -67,6 +99,7 @@ func NewMetricSet(base mb.BaseMetricSet, servicePath string) (*MetricSet, error)
servicePath,
http,
config.XPack,
config.Scope,
}

ms.SetServiceURI(servicePath)
Expand All @@ -84,3 +117,22 @@ func (m *MetricSet) SetServiceURI(servicePath string) {
m.servicePath = servicePath
m.HTTP.SetURI(m.GetServiceURI())
}

func (m *MetricSet) ShouldSkipFetch() (bool, error) {
// If we're talking to a set of ES nodes directly, only collect stats from the master node so
// we don't collect the same stats from every node and end up duplicating them.
if m.Scope == ScopeNode {
isMaster, err := IsMaster(m.HTTP, m.GetServiceURI())
if err != nil {
return false, errors.Wrap(err, "error determining if connected Elasticsearch node is master")
}

// Not master, no event sent
if !isMaster {
m.Logger().Debugf("trying to fetch %v stats from a non-master node", m.Name())
return true, nil
}
}

return false, nil
}
12 changes: 3 additions & 9 deletions metricbeat/module/elasticsearch/ml_job/ml_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
package ml_job

import (
"github.com/pkg/errors"

"github.com/elastic/beats/v7/metricbeat/mb"
"github.com/elastic/beats/v7/metricbeat/module/elasticsearch"
)
Expand Down Expand Up @@ -54,15 +52,11 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {

// Fetch methods implements the data gathering and data conversion to the right format
func (m *MetricSet) Fetch(r mb.ReporterV2) error {

isMaster, err := elasticsearch.IsMaster(m.HTTP, m.GetServiceURI())
shouldSkip, err := m.ShouldSkipFetch()
if err != nil {
return errors.Wrap(err, "error determining if connected Elasticsearch node is master")
return err
}

// Not master, no event sent
if !isMaster {
m.Logger().Debug("trying to fetch machine learning job stats from a non-master node")
if shouldSkip {
return nil
}

Expand Down
17 changes: 7 additions & 10 deletions metricbeat/module/elasticsearch/node_stats/data_xpack.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,18 +187,14 @@ func eventsMappingXPack(r mb.ReporterV2, m *MetricSet, info elasticsearch.Info,
return errors.Wrap(err, "failure parsing Elasticsearch Node Stats API response")
}

// Normally the nodeStruct should only contain one node. But if _local is removed
// from the path and Metricbeat is not installed on the same machine as the node
// it will provid the data for multiple nodes. This will mean the detection of the
// master node will not be accurate anymore as often in these cases a proxy is in front
// of ES and it's not know if the request will be routed to the same node as before.
masterNodeID, err := elasticsearch.GetMasterNodeID(m.HTTP, m.HTTP.GetURI())
if err != nil {
return err
}

var errs multierror.Errors
for nodeID, node := range nodesStruct.Nodes {
isMaster, err := elasticsearch.IsMaster(m.HTTP, m.HTTP.GetURI())
if err != nil {
errs = append(errs, errors.Wrap(err, "error determining if connected Elasticsearch node is master"))
continue
}
isMaster := nodeID == masterNodeID

event := mb.Event{}

Expand All @@ -207,6 +203,7 @@ func eventsMappingXPack(r mb.ReporterV2, m *MetricSet, info elasticsearch.Info,
errs = append(errs, errors.Wrap(err, "failure to apply node schema"))
continue
}

nodeData["node_master"] = isMaster
nodeData["node_id"] = nodeID

Expand Down
Loading

0 comments on commit 903750c

Please sign in to comment.