diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index c2df0a87786..81428620b9f 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -148,6 +148,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Fix aws.s3.bucket.name terms_field in s3 overview dashboard. {pull}17542[17542] - Fix Unix socket path in memcached. {pull}17512[17512] - Fix vsphere VM dashboard host aggregation visualizations. {pull}17555[17555] +- Metricbeat no longer needs to be started strictly after Logstash for `logstash-xpack` module to report correct data. {issue}17261[17261] {pull}17497[17497] *Packetbeat* diff --git a/metricbeat/module/logstash/_meta/Dockerfile b/metricbeat/module/logstash/_meta/Dockerfile index f363b845de9..bef3323664c 100644 --- a/metricbeat/module/logstash/_meta/Dockerfile +++ b/metricbeat/module/logstash/_meta/Dockerfile @@ -2,5 +2,7 @@ ARG LOGSTASH_VERSION FROM docker.elastic.co/logstash/logstash:${LOGSTASH_VERSION} COPY healthcheck.sh / +COPY pipeline/logstash.conf /usr/share/logstash/pipeline/logstash.conf + ENV XPACK_MONITORING_ENABLED=FALSE HEALTHCHECK --interval=1s --retries=300 CMD sh /healthcheck.sh diff --git a/metricbeat/module/logstash/_meta/pipeline/logstash.conf b/metricbeat/module/logstash/_meta/pipeline/logstash.conf new file mode 100644 index 00000000000..b1015fd0f27 --- /dev/null +++ b/metricbeat/module/logstash/_meta/pipeline/logstash.conf @@ -0,0 +1,11 @@ +input { + beats { + port => 5044 + } +} + +output { + elasticsearch { + hosts => ["${ES_HOST:elasticsearch}:${ES_PORT:9200}"] + } +} diff --git a/metricbeat/module/logstash/docker-compose.yml b/metricbeat/module/logstash/docker-compose.yml index 40abaad93e9..ab3284bfa16 100644 --- a/metricbeat/module/logstash/docker-compose.yml +++ b/metricbeat/module/logstash/docker-compose.yml @@ -9,3 +9,18 @@ services: LOGSTASH_VERSION: ${LOGSTASH_VERSION:-7.5.2} ports: - 9600 + depends_on: + - elasticsearch + + elasticsearch: + image: docker.elastic.co/integrations-ci/beats-elasticsearch:${ELASTICSEARCH_VERSION:-7.5.2}-1 + build: + context: ../elasticsearch/_meta + args: + ELASTICSEARCH_VERSION: ${ELASTICSEARCH_VERSION:-7.5.2} + environment: + - "network.host=" + - "transport.host=127.0.0.1" + - "http.host=0.0.0.0" + ports: + - 9200 diff --git a/metricbeat/module/logstash/logstash_integration_test.go b/metricbeat/module/logstash/logstash_integration_test.go index 30dca909a1e..ffaed41a4e6 100644 --- a/metricbeat/module/logstash/logstash_integration_test.go +++ b/metricbeat/module/logstash/logstash_integration_test.go @@ -20,6 +20,9 @@ package logstash_test import ( + "encoding/json" + "io/ioutil" + "net/http" "testing" "github.com/stretchr/testify/require" @@ -68,15 +71,17 @@ func TestData(t *testing.T) { } func TestXPackEnabled(t *testing.T) { - service := compose.EnsureUpWithTimeout(t, 300, "logstash") + lsService := compose.EnsureUpWithTimeout(t, 300, "logstash") + esService := compose.EnsureUpWithTimeout(t, 300, "elasticsearch") + + clusterUUID := getESClusterUUID(t, esService.Host()) metricSetToTypeMap := map[string]string{ "node": "logstash_state", "node_stats": "logstash_stats", } - config := getXPackConfig(service.Host()) - + config := getXPackConfig(lsService.Host()) metricSets := mbtest.NewReportingMetricSetV2Errors(t, config) for _, metricSet := range metricSets { events, errs := mbtest.ReportingFetchV2Error(metricSet) @@ -85,6 +90,7 @@ func TestXPackEnabled(t *testing.T) { event := events[0] require.Equal(t, metricSetToTypeMap[metricSet.Name()], event.RootFields["type"]) + require.Equal(t, clusterUUID, event.RootFields["cluster_uuid"]) require.Regexp(t, `^.monitoring-logstash-\d-mb`, event.Index) } } @@ -105,3 +111,19 @@ func getXPackConfig(host string) map[string]interface{} { "xpack.enabled": true, } } + +func getESClusterUUID(t *testing.T, host string) string { + resp, err := http.Get("http://" + host + "/") + require.NoError(t, err) + defer resp.Body.Close() + + var body struct { + ClusterUUID string `json:"cluster_uuid"` + } + + data, err := ioutil.ReadAll(resp.Body) + require.NoError(t, err) + json.Unmarshal(data, &body) + + return body.ClusterUUID +} diff --git a/metricbeat/module/logstash/node_stats/node_stats.go b/metricbeat/module/logstash/node_stats/node_stats.go index 29a5a7d59c3..5b2c37e5eeb 100644 --- a/metricbeat/module/logstash/node_stats/node_stats.go +++ b/metricbeat/module/logstash/node_stats/node_stats.go @@ -18,7 +18,7 @@ package node_stats import ( - "sync" + "net/url" "github.com/elastic/beats/v7/metricbeat/mb" "github.com/elastic/beats/v7/metricbeat/mb/parse" @@ -50,7 +50,6 @@ var ( // MetricSet type defines all fields of the MetricSet type MetricSet struct { *logstash.MetricSet - initialized sync.Once } // New create a new instance of the MetricSet @@ -69,11 +68,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { // It returns the event which is then forward to the output. In case of an error, a // descriptive error must be returned. func (m *MetricSet) Fetch(r mb.ReporterV2) error { - var err error - m.initialized.Do(func() { - err = m.init() - }) - if err != nil { + if err := m.updateServiceURI(); err != nil { if m.XPack { m.Logger().Error(err) return nil @@ -102,15 +97,37 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) error { return nil } -func (m *MetricSet) init() error { - if m.XPack { - err := m.CheckPipelineGraphAPIsAvailable() - if err != nil { - return err - } - - m.HTTP.SetURI(m.HTTP.GetURI() + "?vertices=true") +func (m *MetricSet) updateServiceURI() error { + u, err := getServiceURI(m.GetURI(), m.XPack, m.CheckPipelineGraphAPIsAvailable) + if err != nil { + return err } + m.HTTP.SetURI(u) return nil + +} + +func getServiceURI(currURI string, xpackEnabled bool, graphAPIsAvailable func() error) (string, error) { + if !xpackEnabled { + // No need to request pipeline vertices from service API + return currURI, nil + } + + if err := graphAPIsAvailable(); err != nil { + return "", err + } + + u, err := url.Parse(currURI) + if err != nil { + return "", err + } + + q := u.Query() + if q.Get("vertices") == "" { + q.Set("vertices", "true") + } + + u.RawQuery = q.Encode() + return u.String(), nil } diff --git a/metricbeat/module/logstash/node_stats/node_stats_test.go b/metricbeat/module/logstash/node_stats/node_stats_test.go new file mode 100644 index 00000000000..8c11ecdde3d --- /dev/null +++ b/metricbeat/module/logstash/node_stats/node_stats_test.go @@ -0,0 +1,89 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package node_stats + +import ( + "errors" + "testing" + "testing/quick" + + "github.com/stretchr/testify/require" +) + +func TestGetServiceURI(t *testing.T) { + tests := map[string]struct { + currURI string + xpackEnabled bool + graphAPIsAvailable func() error + expectedURI string + errExpected bool + }{ + "xpack_disabled": { + currURI: "/_node/stats", + xpackEnabled: false, + graphAPIsAvailable: func() error { return nil }, + expectedURI: "/_node/stats", + errExpected: false, + }, + "apis_unavailable": { + currURI: "/_node/stats", + xpackEnabled: true, + graphAPIsAvailable: func() error { return errors.New("test") }, + expectedURI: "", + errExpected: true, + }, + "with_pipeline_vertices": { + currURI: "_node/stats", + xpackEnabled: true, + graphAPIsAvailable: func() error { return nil }, + expectedURI: "/_node/stats?vertices=true", + errExpected: false, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + newURI, err := getServiceURI(nodeStatsPath, test.xpackEnabled, test.graphAPIsAvailable) + if test.errExpected { + require.Equal(t, "", newURI) + } else { + require.NoError(t, err) + require.Equal(t, test.expectedURI, newURI) + } + }) + } +} + +// See https://github.com/elastic/beats/issues/15974 +func TestGetServiceURIMultipleCalls(t *testing.T) { + err := quick.Check(func(r uint) bool { + var err error + uri := "_node/stats" + + numCalls := 2 + (r % 10) // between 2 and 11 + for i := uint(0); i < numCalls; i++ { + uri, err = getServiceURI(uri, true, func() error { return nil }) + if err != nil { + return false + } + } + + return err == nil && uri == "_node/stats?vertices=true" + }, nil) + require.NoError(t, err) +} diff --git a/testing/environments/local.yml b/testing/environments/local.yml index 592ff2d0ab8..7d588a82987 100644 --- a/testing/environments/local.yml +++ b/testing/environments/local.yml @@ -16,6 +16,7 @@ services: ports: - "127.0.0.1:5044:5044" - "127.0.0.1:5055:5055" + - "127.0.0.1:9600:9600" depends_on: elasticsearch: condition: service_healthy