Skip to content

Commit

Permalink
Removing xpack.monitoring.* settings (#18608)
Browse files Browse the repository at this point in the history
* Removing xpack.monitoring.* setting and corresponding code

* Updating docs

* Adding CHANGELOG entry

* Removing ES calls to custom monitoring endpoints!

* Removing xpack monitoring API params

* Removing format from test
ycombinator authored Jul 17, 2020

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
1 parent 6197850 commit a6f180e
Showing 14 changed files with 23 additions and 512 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
@@ -18,6 +18,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Remove the non-ECS `agent.hostname` field. Use the `agent.name` or `agent.id` fields for an identifier. {issue}16377[16377] {pull}18328[18328]
- Make error message about locked data path actionable. {pull}18667[18667]
- Ensure dynamic template names are unique for the same field. {pull}18849[18849]
- Remove the deprecated `xpack.monitoring.*` settings. Going forward only `monitoring.*` settings may be used. {issue}9424[9424] {pull}18608[18608]

*Auditbeat*

8 changes: 2 additions & 6 deletions libbeat/cmd/instance/beat.go
Original file line number Diff line number Diff line change
@@ -888,12 +888,9 @@ func (b *Beat) clusterUUIDFetchingCallback() (elasticsearch.ConnectCallback, err
}

func (b *Beat) setupMonitoring(settings Settings) (report.Reporter, error) {
monitoringCfg, reporterSettings, err := monitoring.SelectConfig(b.Config.MonitoringBeatConfig)
if err != nil {
return nil, err
}
monitoringCfg := b.Config.MonitoringBeatConfig.Monitoring

monitoringClusterUUID, err := monitoring.GetClusterUUID(b.Config.MonitoringBeatConfig.Monitoring)
monitoringClusterUUID, err := monitoring.GetClusterUUID(monitoringCfg)
if err != nil {
return nil, err
}
@@ -914,7 +911,6 @@ func (b *Beat) setupMonitoring(settings Settings) (report.Reporter, error) {

settings := report.Settings{
DefaultUsername: settings.Monitoring.DefaultUsername,
Format: reporterSettings.Format,
ClusterUUID: monitoringClusterUUID,
}
reporter, err := report.New(b.Info, settings, monitoringCfg, b.Config.Output)
15 changes: 6 additions & 9 deletions libbeat/docs/monitoring/monitoring-beats.asciidoc
Original file line number Diff line number Diff line change
@@ -10,10 +10,10 @@

You can use the {stack} {monitor-features} to gain insight into the health of
ifndef::apm-server[]
{beatname_uc} instances running in your environment.
{beatname_uc} instances running in your environment.
endif::[]
ifdef::apm-server[]
{beatname_uc}.
{beatname_uc}.
endif::[]

To monitor {beatname_uc}, make sure monitoring is enabled on your {es} cluster,
@@ -23,18 +23,15 @@ of following methods:
* <<monitoring-internal-collection,Internal collection>> - Internal
collectors send monitoring data directly to your monitoring cluster.
ifndef::serverless[]
* <<monitoring-metricbeat-collection, {metricbeat} collection>> -
* <<monitoring-metricbeat-collection, {metricbeat} collection>> -
{metricbeat} collects monitoring data from your {beatname_uc} instance
and sends it directly to your monitoring cluster.
endif::[]
* <<monitoring-internal-collection-legacy,Legacy collection (deprecated)>> -
Legacy collectors send monitoring data to your production cluster.


//Commenting out this link temporarily until the general monitoring docs can be
//updated.
//To learn about monitoring in general, see
//{ref}/monitor-elasticsearch-cluster.html[Monitor a cluster].
//updated.
//To learn about monitoring in general, see
//{ref}/monitor-elasticsearch-cluster.html[Monitor a cluster].

--

This file was deleted.

138 changes: 0 additions & 138 deletions libbeat/docs/monitoring/shared-monitor-config-legacy.asciidoc

This file was deleted.

52 changes: 0 additions & 52 deletions libbeat/esleg/eslegclient/bulkapi.go
Original file line number Diff line number Diff line change
@@ -30,7 +30,6 @@ import (
"go.elastic.co/apm"
"go.elastic.co/apm/module/apmhttp"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/logp"
)

@@ -94,43 +93,6 @@ func (conn *Connection) Bulk(
return conn.sendBulkRequest(requ)
}

// SendMonitoringBulk creates a HTTP request to the X-Pack Monitoring API containing a bunch of
// operations and sends them to Elasticsearch. The request is retransmitted up to max_retries
// before returning an error.
func (conn *Connection) SendMonitoringBulk(
params map[string]string,
body []interface{},
) (BulkResult, error) {
if len(body) == 0 {
return nil, nil
}

enc := conn.Encoder
enc.Reset()
if err := bulkEncode(conn.log, enc, body); err != nil {
return nil, err
}

if !conn.version.IsValid() {
if err := conn.Connect(); err != nil {
return nil, err
}
}

mergedParams := mergeParams(conn.ConnectionSettings.Parameters, params)

requ, err := newMonitoringBulkRequest(conn.GetVersion(), conn.URL, mergedParams, enc)
if err != nil {
return nil, err
}

_, result, err := conn.sendBulkRequest(requ)
if err != nil {
return nil, err
}
return result, nil
}

func newBulkRequest(
urlStr string,
index, docType string,
@@ -145,20 +107,6 @@ func newBulkRequest(
return newBulkRequestWithPath(urlStr, path, params, body)
}

func newMonitoringBulkRequest(
esVersion common.Version,
urlStr string,
params map[string]string,
body BodyEncoder,
) (*bulkRequest, error) {
path, err := makePath("_monitoring", "bulk", "")
if err != nil {
return nil, err
}

return newBulkRequestWithPath(urlStr, path, params, body)
}

func newBulkRequestWithPath(
urlStr string,
path string,
28 changes: 1 addition & 27 deletions libbeat/monitoring/monitoring.go
Original file line number Diff line number Diff line change
@@ -21,14 +21,11 @@ import (
"errors"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/cfgwarn"
"github.com/elastic/beats/v7/libbeat/monitoring/report"
)

// BeatConfig represents the part of the $BEAT.yml to do with monitoring settings
type BeatConfig struct {
XPackMonitoring *common.Config `config:"xpack.monitoring"`
Monitoring *common.Config `config:"monitoring"`
Monitoring *common.Config `config:"monitoring"`
}

type Mode uint8
@@ -42,11 +39,6 @@ const (
Full
)

var (
errMonitoringBothConfigEnabled = errors.New("both xpack.monitoring.* and monitoring.* cannot be set. Prefer to set monitoring.* and set monitoring.elasticsearch.hosts to monitoring cluster hosts")
warnMonitoringDeprecatedConfig = "xpack.monitoring.* settings are deprecated. Use monitoring.* instead, but set monitoring.elasticsearch.hosts to monitoring cluster hosts."
)

// Default is the global default metrics registry provided by the monitoring package.
var Default = NewRegistry()

@@ -85,24 +77,6 @@ func Clear() error {
return Default.Clear()
}

// SelectConfig selects the appropriate monitoring configuration based on the user's settings in $BEAT.yml. Users may either
// use xpack.monitoring.* settings OR monitoring.* settings but not both.
func SelectConfig(beatCfg BeatConfig) (*common.Config, *report.Settings, error) {
switch {
case beatCfg.Monitoring.Enabled() && beatCfg.XPackMonitoring.Enabled():
return nil, nil, errMonitoringBothConfigEnabled
case beatCfg.XPackMonitoring.Enabled():
cfgwarn.Deprecate("8.0.0", warnMonitoringDeprecatedConfig)
monitoringCfg := beatCfg.XPackMonitoring
return monitoringCfg, &report.Settings{Format: report.FormatXPackMonitoringBulk}, nil
case beatCfg.Monitoring.Enabled():
monitoringCfg := beatCfg.Monitoring
return monitoringCfg, &report.Settings{Format: report.FormatBulk}, nil
default:
return nil, nil, nil
}
}

// GetClusterUUID returns the value of the monitoring.cluster_uuid setting, if it is set.
func GetClusterUUID(monitoringCfg *common.Config) (string, error) {
if monitoringCfg == nil {
32 changes: 1 addition & 31 deletions libbeat/monitoring/report/elasticsearch/client.go
Original file line number Diff line number Diff line change
@@ -42,20 +42,17 @@ var createDocPrivAvailableESVersion = common.MustNewVersion("7.5.0")
type publishClient struct {
es *eslegclient.Connection
params map[string]string
format report.Format

log *logp.Logger
}

func newPublishClient(
es *eslegclient.Connection,
params map[string]string,
format report.Format,
) (*publishClient, error) {
p := &publishClient{
es: es,
params: params,
format: format,

log: logp.NewLogger(logSelector),
}
@@ -141,14 +138,7 @@ func (c *publishClient) Publish(ctx context.Context, batch publisher.Batch) erro
}
}

switch c.format {
case report.FormatXPackMonitoringBulk:
err = c.publishXPackBulk(params, event, typ)
case report.FormatBulk:
err = c.publishBulk(ctx, event, typ)
}

if err != nil {
if err := c.publishBulk(ctx, event, typ); err != nil {
failed = append(failed, event)
reason = err
}
@@ -170,26 +160,6 @@ func (c *publishClient) String() string {
return "monitoring(" + c.es.URL + ")"
}

func (c *publishClient) publishXPackBulk(params map[string]string, event publisher.Event, typ string) error {
meta := common.MapStr{
"_index": "",
"_routing": nil,
"_type": typ,
}
bulk := [2]interface{}{
common.MapStr{"index": meta},
report.Event{
Timestamp: event.Content.Timestamp,
Fields: event.Content.Fields,
},
}

// Currently one request per event is sent. Reason is that each event can contain different
// interval params and X-Pack requires to send the interval param.
_, err := c.es.SendMonitoringBulk(params, bulk[:])
return err
}

func (c *publishClient) publishBulk(ctx context.Context, event publisher.Event, typ string) error {
meta := common.MapStr{
"_index": getMonitoringIndexName(),
2 changes: 0 additions & 2 deletions libbeat/monitoring/report/elasticsearch/config.go
Original file line number Diff line number Diff line change
@@ -22,7 +22,6 @@ import (
"time"

"github.com/elastic/beats/v7/libbeat/common/transport/tlscommon"
"github.com/elastic/beats/v7/libbeat/monitoring/report"
)

// config is subset of libbeat/outputs/elasticsearch config tailored
@@ -46,7 +45,6 @@ type config struct {
BufferSize int `config:"buffer_size"`
Tags []string `config:"tags"`
Backoff backoff `config:"backoff"`
Format report.Format `config:"_format"`
ClusterUUID string `config:"cluster_uuid"`
}

25 changes: 2 additions & 23 deletions libbeat/monitoring/report/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
@@ -19,7 +19,6 @@ package elasticsearch

import (
"errors"
"fmt"
"io"
"math/rand"
"net/url"
@@ -61,12 +60,6 @@ const logSelector = "monitoring"

var errNoMonitoring = errors.New("xpack monitoring not available")

// default x-pack monitoring api parameters
var defaultXPackParams = map[string]string{
"system_id": "beats",
"system_api_version": "7",
}

func init() {
report.RegisterReporterFactory("elasticsearch", makeReporter)
}
@@ -94,18 +87,13 @@ func defaultConfig(settings report.Settings) config {
Init: 1 * time.Second,
Max: 60 * time.Second,
},
Format: report.FormatXPackMonitoringBulk,
ClusterUUID: settings.ClusterUUID,
}

if settings.DefaultUsername != "" {
c.Username = settings.DefaultUsername
}

if settings.Format != report.FormatUnknown {
c.Format = settings.Format
}

return c
}

@@ -168,7 +156,7 @@ func makeReporter(beat beat.Info, settings report.Settings, cfg *common.Config)
}), nil
}

monitoring := monitoring.Default.GetRegistry("xpack.monitoring")
monitoring := monitoring.Default.GetRegistry("monitoring")

outClient := outputs.NewFailoverClient(clients)
outClient = outputs.WithBackoff(outClient, config.Backoff.Init, config.Backoff.Max)
@@ -345,11 +333,7 @@ func makeClient(
return nil, err
}

if config.Format != report.FormatXPackMonitoringBulk && config.Format != report.FormatBulk {
return nil, fmt.Errorf("unknown reporting format: %v", config.Format)
}

return newPublishClient(esClient, params, config.Format)
return newPublishClient(esClient, params)
}

func closing(log *logp.Logger, c io.Closer) {
@@ -387,11 +371,6 @@ func getClusterUUID() string {
func makeClientParams(config config) map[string]string {
params := map[string]string{}

if config.Format == report.FormatXPackMonitoringBulk {
for k, v := range defaultXPackParams {
params[k] = v
}
}
for k, v := range config.Params {
params[k] = v
}
46 changes: 10 additions & 36 deletions libbeat/monitoring/report/elasticsearch/elasticsearch_test.go
Original file line number Diff line number Diff line change
@@ -21,46 +21,20 @@ import (
"testing"

"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/libbeat/monitoring/report"
)

func TestMakeClientParams(t *testing.T) {
tests := map[string]struct {
format report.Format
params map[string]string
expected map[string]string
}{
"format_bulk": {
report.FormatBulk,
map[string]string{
"foo": "bar",
},
map[string]string{
"foo": "bar",
},
},
"format_xpack_monitoring_bulk": {
report.FormatXPackMonitoringBulk,
map[string]string{
"foo": "bar",
},
map[string]string{
"foo": "bar",
"system_id": "beats",
"system_api_version": "7",
},
},
var params, expected map[string]string
params = map[string]string{
"foo": "bar",
}
expected = map[string]string{
"foo": "bar",
}

for name, test := range tests {
t.Run(name, func(t *testing.T) {
params := makeClientParams(config{
Format: test.format,
Params: test.params,
})
p := makeClientParams(config{
Params: params,
})

require.Equal(t, test.expected, params)
})
}
require.Equal(t, expected, p)
}
25 changes: 0 additions & 25 deletions libbeat/monitoring/report/report.go
Original file line number Diff line number Diff line change
@@ -27,31 +27,13 @@ import (
"github.com/elastic/beats/v7/libbeat/common"
)

// Format encodes the type of format to report monitoring data in. This
// is currently only being used by the elaticsearch reporter.
// This is a hack that is necessary so we can map certain monitoring
// configuration options to certain behaviors in reporters. Depending on
// the configuration option used, the correct format is set, and reporters
// that know how to interpret the format use it to choose the appropriate
// reporting behavior.
type Format int

// Enumerations of various Formats. A reporter can choose whether to
// interpret this setting or not, and if so, how to interpret it.
const (
FormatUnknown Format = iota // to protect against zero-value errors
FormatXPackMonitoringBulk
FormatBulk
)

type config struct {
// allow for maximum one reporter being configured
Reporter common.ConfigNamespace `config:",inline"`
}

type Settings struct {
DefaultUsername string
Format Format
ClusterUUID string
}

@@ -120,13 +102,6 @@ func getReporterConfig(
hosts := hostsCfg{}
rc.Unpack(&hosts)

if settings.Format == FormatXPackMonitoringBulk && len(hosts.Hosts) > 0 {
pathMonHosts := rc.PathOf("hosts")
pathOutHost := outCfg.PathOf("hosts")
err := fmt.Errorf("'%v' and '%v' are configured", pathMonHosts, pathOutHost)
return "", nil, err
}

merged, err := common.MergeConfigs(outCfg, rc)
if err != nil {
return "", nil, err
7 changes: 0 additions & 7 deletions libbeat/tests/system/config/mockbeat.yml.j2
Original file line number Diff line number Diff line change
@@ -107,13 +107,6 @@ logging.metrics.period: {{ metrics_period }}
keystore.path: {{keystore_path}}
{% endif %}

{% if xpack and xpack.monitoring -%}
#================================ X-Pack Monitoring =====================================
xpack.monitoring.elasticsearch.hosts: {{xpack.monitoring.elasticsearch.hosts}}
xpack.monitoring.elasticsearch.metrics.period: 2s # to speed up tests
xpack.monitoring.elasticsearch.state.period: 3s # to speed up tests
{% endif -%}

{% if monitoring -%}
#================================ X-Pack Monitoring (direct) =====================================
monitoring:
118 changes: 0 additions & 118 deletions libbeat/tests/system/test_monitoring.py
Original file line number Diff line number Diff line change
@@ -20,43 +20,6 @@ def setUp(self):
self.es = Elasticsearch([self.get_elasticsearch_url()])
self.es_monitoring = Elasticsearch([self.get_elasticsearch_monitoring_url()])

@unittest.skipUnless(INTEGRATION_TESTS, "integration test")
@attr('integration')
def test_via_output_cluster(self):
"""
Test shipping monitoring data via the elasticsearch output cluster.
Make sure expected documents are indexed in monitoring cluster.
"""

self.render_config_template(
"mockbeat",
xpack={
"monitoring": {
"elasticsearch": {
"hosts": [self.get_elasticsearch_url()]
}
}
}
)

self.clean_output_cluster()
self.clean_monitoring_cluster()
self.init_output_cluster()

proc = self.start_beat(config="mockbeat.yml")
self.wait_until(lambda: self.log_contains("mockbeat start running."))
self.wait_until(lambda: self.log_contains(re.compile("\[monitoring\].*Publish event")))
self.wait_until(lambda: self.log_contains(re.compile(
"Connection to .*elasticsearch\("+self.get_elasticsearch_url()+"\).* established")))
self.wait_until(lambda: self.monitoring_doc_exists('beats_stats'))
self.wait_until(lambda: self.monitoring_doc_exists('beats_state'))

proc.check_kill_and_wait()

for monitoring_doc_type in ['beats_stats', 'beats_state']:
field_names = ['cluster_uuid', 'timestamp', 'interval_ms', 'type', 'source_node', monitoring_doc_type]
self.assert_monitoring_doc_contains_fields(monitoring_doc_type, field_names)

@unittest.skipUnless(INTEGRATION_TESTS, "integration test")
@attr('integration')
def test_direct_to_monitoring_cluster(self):
@@ -91,69 +54,6 @@ def test_direct_to_monitoring_cluster(self):
field_names = ['cluster_uuid', 'timestamp', 'interval_ms', 'type', monitoring_doc_type]
self.assert_monitoring_doc_contains_fields(monitoring_doc_type, field_names)

@unittest.skipUnless(INTEGRATION_TESTS, "integration test")
@attr('integration')
def test_compare(self):
"""
Test that monitoring docs are the same, regardless of how they are shipped.
"""

self.render_config_template(
"mockbeat",
xpack={
"monitoring": {
"elasticsearch": {
"hosts": [self.get_elasticsearch_url()]
}
}
}
)

self.clean_output_cluster()
self.clean_monitoring_cluster()
self.init_output_cluster()

proc = self.start_beat(config="mockbeat.yml")
self.wait_until(lambda: self.log_contains("mockbeat start running."))
self.wait_until(lambda: self.log_contains(re.compile("\[monitoring\].*Publish event")))
self.wait_until(lambda: self.log_contains(re.compile(
"Connection to .*elasticsearch\("+self.get_elasticsearch_url()+"\).* established")))
self.wait_until(lambda: self.monitoring_doc_exists('beats_stats'))
self.wait_until(lambda: self.monitoring_doc_exists('beats_state'))

proc.check_kill_and_wait()

indirect_beats_stats_doc = self.get_monitoring_doc('beats_stats')
indirect_beats_state_doc = self.get_monitoring_doc('beats_state')

self.render_config_template(
"mockbeat",
monitoring={
"elasticsearch": {
"hosts": [self.get_elasticsearch_monitoring_url()]
}
}
)

self.clean_output_cluster()
self.clean_monitoring_cluster()

proc = self.start_beat(config="mockbeat.yml")
self.wait_until(lambda: self.log_contains("mockbeat start running."))
self.wait_until(lambda: self.log_contains(re.compile("\[monitoring\].*Publish event")))
self.wait_until(lambda: self.log_contains(re.compile(
"Connection to .*elasticsearch\("+self.get_elasticsearch_monitoring_url()+"\).* established")))
self.wait_until(lambda: self.monitoring_doc_exists('beats_stats'))
self.wait_until(lambda: self.monitoring_doc_exists('beats_state'))

proc.check_kill_and_wait()

direct_beats_stats_doc = self.get_monitoring_doc('beats_stats')
direct_beats_state_doc = self.get_monitoring_doc('beats_state')

self.assert_same_structure(indirect_beats_state_doc['beats_state'], direct_beats_state_doc['beats_state'])
self.assert_same_structure(indirect_beats_stats_doc['beats_stats'], direct_beats_stats_doc['beats_stats'])

@unittest.skipUnless(INTEGRATION_TESTS, "integration test")
@attr('integration')
def test_cluster_uuid_setting(self):
@@ -269,24 +169,6 @@ def clean_monitoring_cluster(self):
# Delete any old beats monitoring data
self.es_monitoring.indices.delete(index=".monitoring-beats-*", ignore=[404])

def init_output_cluster(self):
# Setup remote exporter
self.es.cluster.put_settings(body={
"transient": {
"xpack.monitoring.exporters.my_remote": {
"type": "http",
"host": [self.get_elasticsearch_monitoring_url()]
}
}
})

# Enable collection
self.es.cluster.put_settings(body={
"transient": {
"xpack.monitoring.collection.enabled": True
}
})

def get_elasticsearch_monitoring_url(self):
return "http://{host}:{port}".format(
host=os.getenv("ES_MONITORING_HOST", "localhost"),

0 comments on commit a6f180e

Please sign in to comment.