diff --git a/metricbeat/module/elasticsearch/ml_job/_meta/data.json b/metricbeat/module/elasticsearch/ml_job/_meta/data.json index 93a053224f0..1abbbc55645 100644 --- a/metricbeat/module/elasticsearch/ml_job/_meta/data.json +++ b/metricbeat/module/elasticsearch/ml_job/_meta/data.json @@ -5,12 +5,17 @@ "name": "host.example.com" }, "elasticsearch": { + "cluster": { + "id": "3LbUkLkURz--FR-YO0wLNA", + "name": "es1" + }, "ml": { "job": { "data_counts": { + "invalid_date_count": 0, "processed_record_count": 0 }, - "id": "filebeat-apache2-access-low_request_rate", + "id": "total-requests", "state": "closed" } } @@ -25,4 +30,4 @@ "service": { "name": "elasticsearch" } -} \ No newline at end of file +} diff --git a/metricbeat/module/elasticsearch/ml_job/data.go b/metricbeat/module/elasticsearch/ml_job/data.go index df86ebfe89b..796d1faf987 100644 --- a/metricbeat/module/elasticsearch/ml_job/data.go +++ b/metricbeat/module/elasticsearch/ml_job/data.go @@ -45,7 +45,7 @@ type jobsStruct struct { Jobs []map[string]interface{} `json:"jobs"` } -func eventsMapping(r mb.ReporterV2, content []byte) error { +func eventsMapping(r mb.ReporterV2, info elasticsearch.Info, content []byte) error { jobsData := &jobsStruct{} err := json.Unmarshal(content, jobsData) @@ -63,6 +63,10 @@ func eventsMapping(r mb.ReporterV2, content []byte) error { event.RootFields = common.MapStr{} event.RootFields.Put("service.name", elasticsearch.ModuleName) + event.ModuleFields = common.MapStr{} + event.ModuleFields.Put("cluster.name", info.ClusterName) + event.ModuleFields.Put("cluster.id", info.ClusterID) + event.MetricSetFields, err = schema.Apply(job) if err != nil { event.Error = errors.Wrap(err, "failure applying ml job schema") diff --git a/metricbeat/module/elasticsearch/ml_job/data_test.go b/metricbeat/module/elasticsearch/ml_job/data_test.go new file mode 100644 index 00000000000..b6081642b9f --- /dev/null +++ b/metricbeat/module/elasticsearch/ml_job/data_test.go @@ -0,0 +1,30 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// +build !integration + +package ml_job + +import ( + "testing" + + "github.com/elastic/beats/metricbeat/module/elasticsearch" +) + +func TestMapper(t *testing.T) { + elasticsearch.TestMapperWithInfo(t, "./_meta/test/ml.*.json", eventsMapping) +} diff --git a/metricbeat/module/elasticsearch/ml_job/data_xpack.go b/metricbeat/module/elasticsearch/ml_job/data_xpack.go index 894c87c35d8..a68d956824b 100644 --- a/metricbeat/module/elasticsearch/ml_job/data_xpack.go +++ b/metricbeat/module/elasticsearch/ml_job/data_xpack.go @@ -31,14 +31,9 @@ import ( "github.com/elastic/beats/metricbeat/module/elasticsearch" ) -func eventsMappingXPack(r mb.ReporterV2, m *MetricSet, content []byte) error { - info, err := elasticsearch.GetInfo(m.HTTP, m.HTTP.GetURI()) - if err != nil { - return errors.Wrap(err, "failed to get info from Elasticsearch") - } - +func eventsMappingXPack(r mb.ReporterV2, m *MetricSet, info elasticsearch.Info, content []byte) error { var data map[string]interface{} - err = json.Unmarshal(content, &data) + err := json.Unmarshal(content, &data) if err != nil { return errors.Wrap(err, "failure parsing Elasticsearch ML Job Stats API response") } diff --git a/metricbeat/module/elasticsearch/ml_job/ml_job.go b/metricbeat/module/elasticsearch/ml_job/ml_job.go index cc568747d9c..453bea1a221 100644 --- a/metricbeat/module/elasticsearch/ml_job/ml_job.go +++ b/metricbeat/module/elasticsearch/ml_job/ml_job.go @@ -58,7 +58,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { // Fetch methods implements the data gathering and data conversion to the right format func (m *MetricSet) Fetch(r mb.ReporterV2) { - isMaster, err := elasticsearch.IsMaster(m.HTTP, m.HostData().SanitizedURI+jobPath) + isMaster, err := elasticsearch.IsMaster(m.HTTP, m.getServiceURI()) if err != nil { err = errors.Wrap(err, "error determining if connected Elasticsearch node is master") elastic.ReportAndLogError(err, r, m.Log) @@ -71,6 +71,12 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) { return } + info, err := elasticsearch.GetInfo(m.HTTP, m.getServiceURI()) + if err != nil { + elastic.ReportAndLogError(err, r, m.Log) + return + } + content, err := m.HTTP.FetchContent() if err != nil { elastic.ReportAndLogError(err, r, m.Log) @@ -78,9 +84,9 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) { } if m.XPack { - err = eventsMappingXPack(r, m, content) + err = eventsMappingXPack(r, m, *info, content) } else { - err = eventsMapping(r, content) + err = eventsMapping(r, *info, content) } if err != nil { @@ -88,3 +94,7 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) { return } } + +func (m *MetricSet) getServiceURI() string { + return m.HostData().SanitizedURI + jobPath +}