Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[8.5](backport #33457) [Metricbeat][elasticsearch] Use replica number in doc ID for shard metricset #33695

Merged
merged 1 commit into from
Nov 16, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 27 additions & 23 deletions metricbeat/module/elasticsearch/shard/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ import (
"github.com/elastic/beats/v7/metricbeat/helper/elastic"
"github.com/elastic/elastic-agent-libs/mapstr"

"fmt"

"github.com/joeshaw/multierror"
"github.com/pkg/errors"

s "github.com/elastic/beats/v7/libbeat/common/schema"
c "github.com/elastic/beats/v7/libbeat/common/schema/mapstriface"
Expand Down Expand Up @@ -61,45 +62,45 @@ func eventsMapping(r mb.ReporterV2, content []byte, isXpack bool) error {
stateData := &stateStruct{}
err := json.Unmarshal(content, stateData)
if err != nil {
return errors.Wrap(err, "failure parsing Elasticsearch Cluster State API response")
return fmt.Errorf("failure parsing Elasticsearch Cluster State API response: %w", err)
}

var errs multierror.Errors
for _, index := range stateData.RoutingTable.Indices {
for _, shards := range index.Shards {
for _, shard := range shards {
for i, shard := range shards {
event := mb.Event{
ModuleFields: mapstr.M{},
}

event.ModuleFields.Put("cluster.state.id", stateData.StateID)
event.ModuleFields.Put("cluster.stats.state.state_uuid", stateData.StateID)
event.ModuleFields.Put("cluster.id", stateData.ClusterID)
event.ModuleFields.Put("cluster.name", stateData.ClusterName)
_, _ = event.ModuleFields.Put("cluster.state.id", stateData.StateID)
_, _ = event.ModuleFields.Put("cluster.stats.state.state_uuid", stateData.StateID)
_, _ = event.ModuleFields.Put("cluster.id", stateData.ClusterID)
_, _ = event.ModuleFields.Put("cluster.name", stateData.ClusterName)

fields, err := schema.Apply(shard)
if err != nil {
errs = append(errs, errors.Wrap(err, "failure applying shard schema"))
errs = append(errs, fmt.Errorf("failure applying shard schema: %w", err))
continue
}

// Handle node field: could be string or null
err = elasticsearch.PassThruField("node", shard, fields)
if err != nil {
errs = append(errs, errors.Wrap(err, "failure passing through node field"))
errs = append(errs, fmt.Errorf("failure passing through node field: %w", err))
continue
}

// Handle relocating_node field: could be string or null
err = elasticsearch.PassThruField("relocating_node", shard, fields)
if err != nil {
errs = append(errs, errors.Wrap(err, "failure passing through relocating_node field"))
errs = append(errs, fmt.Errorf("failure passing through relocating_node field: %w", err))
continue
}

event.ID, err = generateHashForEvent(stateData.StateID, fields)
event.ID, err = generateHashForEvent(stateData.StateID, fields, i)
if err != nil {
errs = append(errs, errors.Wrap(err, "failure getting event ID"))
errs = append(errs, fmt.Errorf("failure getting event ID: %w", err))
continue
}

Expand All @@ -110,28 +111,28 @@ func eventsMapping(r mb.ReporterV2, content []byte, isXpack bool) error {
continue
}
if nodeID != nil { // shard has not been allocated yet
event.ModuleFields.Put("node.id", nodeID)
_, _ = event.ModuleFields.Put("node.id", nodeID)
delete(fields, "node")

sourceNode, err := getSourceNode(nodeID.(string), stateData)
if err != nil {
errs = append(errs, errors.Wrap(err, "failure getting source node information"))
errs = append(errs, fmt.Errorf("failure getting source node information: %w", err))
continue
}
event.ModuleFields.Put("node.name", sourceNode["name"])
event.MetricSetFields.Put("source_node", sourceNode)
_, _ = event.ModuleFields.Put("node.name", sourceNode["name"])
_, _ = event.MetricSetFields.Put("source_node", sourceNode)
}

event.ModuleFields.Put("index.name", fields["index"])
_, _ = event.ModuleFields.Put("index.name", fields["index"])
delete(fields, "index")

event.MetricSetFields.Put("number", fields["shard"])
_, _ = event.MetricSetFields.Put("number", fields["shard"])
delete(event.MetricSetFields, "shard")

delete(event.MetricSetFields, "relocating_node")
relocatingNode := fields["relocating_node"]
event.MetricSetFields.Put("relocating_node.name", relocatingNode)
event.MetricSetFields.Put("relocating_node.id", relocatingNode)
_, _ = event.MetricSetFields.Put("relocating_node.name", relocatingNode)
_, _ = event.MetricSetFields.Put("relocating_node.id", relocatingNode)

// xpack.enabled in config using standalone metricbeat writes to `.monitoring` instead of `metricbeat-*`
// When using Agent, the index name is overwritten anyways.
Expand Down Expand Up @@ -160,7 +161,10 @@ func getSourceNode(nodeID string, stateData *stateStruct) (mapstr.M, error) {
}, nil
}

func generateHashForEvent(stateID string, shard mapstr.M) (string, error) {
// Note: This function may generate duplicate IDs, but those will be dropped since libbeat
// ignores the 409 status code
// https://github.com/elastic/beats/blob/main/libbeat/outputs/elasticsearch/client.go#L396
func generateHashForEvent(stateID string, shard mapstr.M, index int) (string, error) {
var nodeID string
if shard["node"] == nil {
nodeID = "_na"
Expand All @@ -181,7 +185,7 @@ func generateHashForEvent(stateID string, shard mapstr.M) (string, error) {
if !ok {
return "", elastic.MakeErrorForMissingField("shard", elastic.Elasticsearch)
}
shardNumberStr := strconv.FormatInt(shardNumberInt, 10)
shardNumberStr := "s" + strconv.FormatInt(shardNumberInt, 10)

isPrimary, ok := shard["primary"].(bool)
if !ok {
Expand All @@ -191,7 +195,7 @@ func generateHashForEvent(stateID string, shard mapstr.M) (string, error) {
if isPrimary {
shardType = "p"
} else {
shardType = "r"
shardType = "r" + strconv.Itoa(index)
}

return stateID + ":" + nodeID + ":" + indexName + ":" + shardNumberStr + ":" + shardType, nil
Expand Down