Skip to content

Commit

Permalink
Get rid of errors generated by xpack code for elasticsearch/shard m…
Browse files Browse the repository at this point in the history
…etricset (elastic#8192) (elastic#8285)

* Refactoring: Extract passthruFields function for broader use

* Passthru fields that could sometimes by null

* Do not report errors in xpack path as they will go into metricbeat index

* Un-hardcode index name

* More fixes

* Regenerating data.json

* Fixing order

* Renaming data JSON file to include version

* Renaming data.700.json back to data.json

* Adding unit test fixture for 7.0.0
  • Loading branch information
ycombinator authored Sep 13, 2018
1 parent 3ca388f commit e6220c2
Show file tree
Hide file tree
Showing 9 changed files with 194 additions and 30 deletions.
10 changes: 10 additions & 0 deletions metricbeat/docs/fields.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -4419,6 +4419,16 @@ type: keyword
The state of this shard.
--
*`elasticsearch.shard.relocating_node.name`*::
+
--
type: keyword
The node the shard was relocated from.
--
[[exported-fields-envoyproxy]]
Expand Down
12 changes: 1 addition & 11 deletions metricbeat/module/elasticsearch/cluster_stats/data_xpack.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,6 @@ import (
"github.com/elastic/beats/metricbeat/mb"
)

func passthruField(fieldPath string, sourceData, targetData common.MapStr) error {
fieldValue, err := sourceData.GetValue(fieldPath)
if err != nil {
return elastic.MakeErrorForMissingField(fieldPath, elastic.Elasticsearch)
}

targetData.Put(fieldPath, fieldValue)
return nil
}

func clusterNeedsTLSEnabled(license, stackStats common.MapStr) (bool, error) {
// TLS does not need to be enabled if license type is something other than trial
value, err := license.GetValue("license.type")
Expand Down Expand Up @@ -184,7 +174,7 @@ func eventMappingXPack(r mb.ReporterV2, m *MetricSet, content []byte) error {
return err
}

if err = passthruField("status", clusterStats, clusterState); err != nil {
if err = elasticsearch.PassThruField("status", clusterStats, clusterState); err != nil {
return err
}

Expand Down
13 changes: 13 additions & 0 deletions metricbeat/module/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/metricbeat/helper"
"github.com/elastic/beats/metricbeat/helper/elastic"
)

// Global clusterIdCache. Assumption is that the same node id never can belong to a different cluster id
Expand Down Expand Up @@ -224,6 +225,18 @@ func GetStackUsage(http *helper.HTTP, resetURI string) (common.MapStr, error) {
return stackUsage, err
}

// PassThruField copies the field at the given path from the given source data object into
// the same path in the given target data object
func PassThruField(fieldPath string, sourceData, targetData common.MapStr) error {
fieldValue, err := sourceData.GetValue(fieldPath)
if err != nil {
return elastic.MakeErrorForMissingField(fieldPath, elastic.Elasticsearch)
}

targetData.Put(fieldPath, fieldValue)
return nil
}

// Global cache for license information. Assumption is that license information changes infrequently
var licenseCache = &_licenseCache{}

Expand Down
2 changes: 1 addition & 1 deletion metricbeat/module/elasticsearch/fields.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 4 additions & 3 deletions metricbeat/module/elasticsearch/shard/_meta/data.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,17 @@
"cluster": {
"name": "elasticsearch",
"state": {
"id": "dJaHX6fxSqSVMOsL4QZwwQ"
"id": "MBE4XrQOSf6ScXRTuCO1Pw"
}
},
"index": {
"name": "filebeat-7.0.0-alpha1-2018.05.09"
"name": "heartbeat-7.0.0-alpha1-2018.08.27"
},
"node": {
"name": "523zXyT6TRWiqXcQItnkyQ"
"name": "Z4hBonPxQVW9qPKEHpwWCg"
},
"shard": {
"number": 0,
"primary": true,
"state": "STARTED"
}
Expand Down
5 changes: 4 additions & 1 deletion metricbeat/module/elasticsearch/shard/_meta/fields.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,11 @@
type: long
description: >
The number of this shard.
- name: state
type: keyword
description: >
The state of this shard.
- name: relocating_node.name
type: keyword
description: >
The node the shard was relocated from.
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
{
"cluster_name": "elasticsearch",
"compressed_size_in_bytes": 11865,
"cluster_uuid": "d0xWWzE8S_2x3Z-lMagVxw",
"version": 26,
"state_uuid": "K6wBBv1ZQk-1HgpnPx5awQ",
"master_node": "PL_V6KgiSXCO-vI2Rm-B_A",
"routing_table": {
"indices": {
".monitoring-es-6-2018.09.05": {
"shards": {
"0": [
{
"state": "STARTED",
"primary": true,
"node": "PL_V6KgiSXCO-vI2Rm-B_A",
"relocating_node": null,
"shard": 0,
"index": ".monitoring-es-6-2018.09.05",
"allocation_id": {
"id": "mOtgQ2xsTGWbRSlRbkbrcA"
}
},
{
"state": "UNASSIGNED",
"primary": false,
"node": null,
"relocating_node": null,
"shard": 0,
"index": ".monitoring-es-6-2018.09.05",
"recovery_source": {
"type": "PEER"
},
"unassigned_info": {
"reason": "REPLICA_ADDED",
"at": "2018-09-05T16:23:21.269Z",
"delayed": false,
"allocation_status": "no_attempt"
}
}
]
}
},
"testindex": {
"shards": {
"0": [
{
"state": "STARTED",
"primary": true,
"node": "PL_V6KgiSXCO-vI2Rm-B_A",
"relocating_node": null,
"shard": 0,
"index": "testindex",
"allocation_id": {
"id": "epzwd1uKQGSLa745KCBYQw"
}
},
{
"state": "INITIALIZING",
"primary": false,
"node": "DdU2Zw-gSRCmAi0Kp46aCg",
"relocating_node": null,
"shard": 0,
"index": "testindex",
"recovery_source": {
"type": "PEER"
},
"allocation_id": {
"id": "foAIE9r0S-Ob8vjstFg1qA"
},
"unassigned_info": {
"reason": "INDEX_CREATED",
"at": "2018-09-05T16:22:02.488Z",
"delayed": false,
"allocation_status": "no_attempt"
}
}
]
}
},
".kibana": {
"shards": {
"0": [
{
"state": "STARTED",
"primary": true,
"node": "PL_V6KgiSXCO-vI2Rm-B_A",
"relocating_node": null,
"shard": 0,
"index": ".kibana",
"allocation_id": {
"id": "R4ydh8SOS4iPcKsbgMwkoA"
}
},
{
"state": "UNASSIGNED",
"primary": false,
"node": null,
"relocating_node": null,
"shard": 0,
"index": ".kibana",
"recovery_source": {
"type": "PEER"
},
"unassigned_info": {
"reason": "REPLICA_ADDED",
"at": "2018-09-05T16:23:21.269Z",
"delayed": false,
"allocation_status": "no_attempt"
}
}
]
}
}
}
}
}
38 changes: 31 additions & 7 deletions metricbeat/module/elasticsearch/shard/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,15 @@ import (
s "github.com/elastic/beats/libbeat/common/schema"
c "github.com/elastic/beats/libbeat/common/schema/mapstriface"
"github.com/elastic/beats/metricbeat/mb"
"github.com/elastic/beats/metricbeat/module/elasticsearch"
)

var (
schema = s.Schema{
"state": c.Str("state"),
"primary": c.Bool("primary"),
"node": c.Str("node"),
"index": c.Str("index"),
"shard": c.Int("number"),
"relocating_node": c.Str("relocating_node"),
"state": c.Str("state"),
"primary": c.Bool("primary"),
"index": c.Str("index"),
"shard": c.Int("shard"),
}
)

Expand Down Expand Up @@ -61,13 +60,38 @@ func eventsMapping(r mb.ReporterV2, content []byte) {
for _, shard := range shards {
event := mb.Event{}

fields, _ := schema.Apply(shard)
fields, err := schema.Apply(shard)
if err != nil {
r.Error(err)
continue
}

// Handle node field: could be string or null
err = elasticsearch.PassThruField("node", shard, fields)
if err != nil {
continue
}

// Handle relocating_node field: could be string or null
err = elasticsearch.PassThruField("relocating_node", shard, fields)
if err != nil {
continue
}

event.ModuleFields = common.MapStr{}
event.ModuleFields.Put("node.name", fields["node"])
delete(fields, "node")

event.ModuleFields.Put("index.name", fields["index"])
delete(fields, "index")

event.MetricSetFields = fields
event.MetricSetFields.Put("number", fields["shard"])
delete(event.MetricSetFields, "shard")

delete(event.MetricSetFields, "relocating_node")
event.MetricSetFields.Put("relocating_node.name", fields["relocating_node"])

event.ModuleFields.Put("cluster.state.id", stateData.StateID)
event.ModuleFields.Put("cluster.name", stateData.ClusterName)

Expand Down
20 changes: 13 additions & 7 deletions metricbeat/module/elasticsearch/shard/data_xpack.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/metricbeat/helper/elastic"
"github.com/elastic/beats/metricbeat/mb"
"github.com/elastic/beats/metricbeat/module/elasticsearch"
)
Expand All @@ -30,21 +31,18 @@ func eventsMappingXPack(r mb.ReporterV2, m *MetricSet, content []byte) {
stateData := &stateStruct{}
err := json.Unmarshal(content, stateData)
if err != nil {
r.Error(err)
return
}

nodeInfo, err := elasticsearch.GetNodeInfo(m.HTTP, m.HostData().SanitizedURI+statePath, stateData.MasterNode)
if err != nil {
r.Error(err)
return
}

// TODO: This is currently needed because the cluser_uuid is `na` in stateData in case not the full state is requested.
// Will be fixed in: https://github.com/elastic/elasticsearch/pull/30656
clusterID, err := elasticsearch.GetClusterID(m.HTTP, m.HostData().SanitizedURI+statePath, stateData.MasterNode)
if err != nil {
r.Error(err)
return
}

Expand All @@ -64,12 +62,20 @@ func eventsMappingXPack(r mb.ReporterV2, m *MetricSet, content []byte) {
event := mb.Event{}
fields, err := schema.Apply(shard)
if err != nil {
r.Error(err)
continue
}

fields["shard"] = fields["number"]
delete(fields, "number")
// Handle node field: could be string or null
err = elasticsearch.PassThruField("node", shard, fields)
if err != nil {
continue
}

// Handle relocating_node field: could be string or null
err = elasticsearch.PassThruField("relocating_node", shard, fields)
if err != nil {
continue
}

event.RootFields = common.MapStr{}

Expand All @@ -82,7 +88,7 @@ func eventsMappingXPack(r mb.ReporterV2, m *MetricSet, content []byte) {
"shard": fields,
"state_uuid": stateData.StateID,
}
event.Index = ".monitoring-es-6-mb"
event.Index = elastic.MakeXPackMonitoringIndexName(elastic.Elasticsearch)

r.Event(event)

Expand Down

0 comments on commit e6220c2

Please sign in to comment.