Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Additional stats fields for Elasticsearch #41944

Merged
merged 18 commits into from
Dec 12, 2024
Merged
21 changes: 21 additions & 0 deletions metricbeat/docs/fields.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -32161,6 +32161,27 @@ type: keyword

--

*`elasticsearch.index.tier_preference`*::
+
--
type: keyword

--

*`elasticsearch.index.creation_date`*::
+
--
type: date

--

*`elasticsearch.index.version`*::
+
--
type: keyword

--

*`elasticsearch.index.name`*::
+
--
Expand Down
3 changes: 1 addition & 2 deletions metricbeat/include/list_init.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion metricbeat/module/elasticsearch/cluster_stats/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ func eventMapping(r mb.ReporterV2, httpClient *helper.HTTP, info elasticsearch.I
}

clusterStateMetrics := []string{"version", "master_node", "nodes", "routing_table"}
clusterState, err := elasticsearch.GetClusterState(httpClient, httpClient.GetURI(), clusterStateMetrics)
clusterState, err := elasticsearch.GetClusterState(httpClient, httpClient.GetURI(), clusterStateMetrics, []string{})
if err != nil {
return fmt.Errorf("failed to get cluster state from Elasticsearch: %w", err)
}
Expand Down
34 changes: 32 additions & 2 deletions metricbeat/module/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,13 +288,21 @@ func GetLicense(http *helper.HTTP, resetURI string) (*License, error) {
}

// GetClusterState returns cluster state information.
func GetClusterState(http *helper.HTTP, resetURI string, metrics []string) (mapstr.M, error) {
func GetClusterState(http *helper.HTTP, resetURI string, metrics []string, filterPaths []string) (mapstr.M, error) {
queryParams := []string{"local=true"}
clusterStateURI := "_cluster/state"
if len(metrics) > 0 {
clusterStateURI += "/" + strings.Join(metrics, ",")
}

content, err := fetchPath(http, resetURI, clusterStateURI, "local=true")
if len(filterPaths) > 0 {
filterPathQueryParam := "filter_path=" + strings.Join(filterPaths, ",")
queryParams = append(queryParams, filterPathQueryParam)
}

queryString := strings.Join(queryParams, "&")

content, err := fetchPath(http, resetURI, clusterStateURI, queryString)
if err != nil {
return nil, err
}
Expand All @@ -304,6 +312,28 @@ func GetClusterState(http *helper.HTTP, resetURI string, metrics []string) (maps
return clusterState, err
}

func GetIndexSettings(http *helper.HTTP, resetURI string, indexPattern string, filterPaths []string) (mapstr.M, error) {

queryParams := []string{"local=true", "expand_wildcards=hidden,all"}
indicesSettingsURI := indexPattern + "/_settings"
consulthys marked this conversation as resolved.
Show resolved Hide resolved

if len(filterPaths) > 0 {
filterPathQueryParam := "filter_path=" + strings.Join(filterPaths, ",")
queryParams = append(queryParams, filterPathQueryParam)
}

queryString := strings.Join(queryParams, "&")

content, err := fetchPath(http, resetURI, indicesSettingsURI, queryString)
if err != nil {
return nil, err
}

var indicesSettings map[string]interface{}
err = json.Unmarshal(content, &indicesSettings)
return indicesSettings, err
}

// GetClusterSettingsWithDefaults returns cluster settings.
func GetClusterSettingsWithDefaults(http *helper.HTTP, resetURI string, filterPaths []string) (mapstr.M, error) {
return GetClusterSettings(http, resetURI, true, filterPaths)
Expand Down
2 changes: 1 addition & 1 deletion metricbeat/module/elasticsearch/fields.go

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions metricbeat/module/elasticsearch/index/_meta/data.json
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,9 @@
}
},
"status": "green",
"tier_preference": "data_content",
"creation_date": 1731657995821,
"version": "8505000",
"hidden": true,
"shards": {
"total": 1,
Expand Down
6 changes: 6 additions & 0 deletions metricbeat/module/elasticsearch/index/_meta/fields.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@
type: keyword
- name: status
type: keyword
- name: tier_preference
type: keyword
- name: creation_date
type: date
- name: version
type: keyword
- name: name
type: keyword
description: >
Expand Down
105 changes: 97 additions & 8 deletions metricbeat/module/elasticsearch/index/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package index
import (
"encoding/json"
"fmt"
"strconv"

"github.com/joeshaw/multierror"

Expand All @@ -40,9 +41,12 @@ type Index struct {
Primaries primaries `json:"primaries"`
Total total `json:"total"`

Index string `json:"index"`
Status string `json:"status"`
Shards shardStats `json:"shards"`
Index string `json:"index"`
Status string `json:"status"`
TierPreference string `json:"tier_preference"`
CreationDate int `json:"creation_date"`
Version string `json:"version"`
Shards shardStats `json:"shards"`
}

type primaries struct {
Expand Down Expand Up @@ -180,11 +184,19 @@ type bulkStats struct {

func eventsMapping(r mb.ReporterV2, httpClient *helper.HTTP, info elasticsearch.Info, content []byte, isXpack bool) error {
clusterStateMetrics := []string{"routing_table"}
clusterState, err := elasticsearch.GetClusterState(httpClient, httpClient.GetURI(), clusterStateMetrics)
clusterStateFilterPaths := []string{"routing_table"}
clusterState, err := elasticsearch.GetClusterState(httpClient, httpClient.GetURI(), clusterStateMetrics, clusterStateFilterPaths)
if err != nil {
return fmt.Errorf("failure retrieving cluster state from Elasticsearch: %w", err)
}

indicesSettingsPattern := "*,.*"
indicesSettingsFilterPaths := []string{"*.settings.index.creation_date", "*.settings.index.**._tier_preference", "*.settings.index.version.created"}
indicesSettings, err := elasticsearch.GetIndexSettings(httpClient, httpClient.GetURI(), indicesSettingsPattern, indicesSettingsFilterPaths)
if err != nil {
return fmt.Errorf("failure retrieving index settings from Elasticsearch: %w", err)
}

var indicesStats stats
if err := parseAPIResponse(content, &indicesStats); err != nil {
return fmt.Errorf("failure parsing Indices Stats Elasticsearch API response: %w", err)
Expand All @@ -204,6 +216,12 @@ func eventsMapping(r mb.ReporterV2, httpClient *helper.HTTP, info elasticsearch.
continue
}

err = addIndexSettings(&idx, indicesSettings)
if err != nil {
errs = append(errs, fmt.Errorf("failure adding index settings: %w", err))
continue
}

event.ModuleFields.Put("cluster.id", info.ClusterID)
event.ModuleFields.Put("cluster.name", info.ClusterName)

Expand Down Expand Up @@ -271,6 +289,63 @@ func addClusterStateFields(idx *Index, clusterState mapstr.M) error {
return nil
}

func addIndexSettings(idx *Index, indicesSettings mapstr.M) error {

// Recover the index settings for our specific index
indexSettingsValue, err := indicesSettings.GetValue(idx.Index)
if err != nil {
return fmt.Errorf("failed to get index settings for index %s: %w", idx.Index, err)
}

indexSettings, ok := indexSettingsValue.(map[string]interface{})
if !ok {
return fmt.Errorf("index settings is not a map for index: %s", idx.Index)
}

indexCreationDate, err := getIndexSettingForIndex(indexSettings, idx.Index, "index.creation_date")
if err != nil {
return fmt.Errorf("failed to get index creation date: %w", err)
}

idx.CreationDate, err = strconv.Atoi(indexCreationDate)
if err != nil {
return fmt.Errorf("failed to convert index creation date to int: %w", err)
}

indexTierPreference, err := getIndexSettingForIndex(indexSettings, idx.Index, "index.routing.allocation.require._tier_preference")
if err != nil {
indexTierPreference, err = getIndexSettingForIndex(indexSettings, idx.Index, "index.routing.allocation.include._tier_preference")
if err != nil {
return fmt.Errorf("failed to get index tier preference: %w", err)
}
}

idx.TierPreference = indexTierPreference

indexVersion, err := getIndexSettingForIndex(indexSettings, idx.Index, "index.version.created")
if err != nil {
return fmt.Errorf("failed to get index version: %w", err)
}

idx.Version = indexVersion

return nil
}

func getIndexSettingForIndex(indexSettings mapstr.M, index, settingKey string) (string, error) {
fieldKey := "settings." + settingKey
value, err := indexSettings.GetValue(fieldKey)
if err != nil {
return "", fmt.Errorf("'"+fieldKey+"': %w", err)
}

setting, ok := value.(string)
if !ok {
return "", elastic.MakeErrorForMissingField(fieldKey, elastic.Elasticsearch)
}
return setting, nil
}

func getClusterStateMetricForIndex(clusterState mapstr.M, index, metricKey string) (mapstr.M, error) {
fieldKey := metricKey + ".indices." + index
value, err := clusterState.GetValue(fieldKey)
Expand Down Expand Up @@ -308,8 +383,15 @@ func getIndexStatus(shards map[string]interface{}) (string, error) {

shard := mapstr.M(s)

isPrimary := shard["primary"].(bool)
state := shard["state"].(string)
isPrimary, ok := shard["primary"].(bool)
if !ok {
return "", fmt.Errorf("%v.shards[%v].primary is not a boolean", indexName, shardIdx)
}

state, ok := shard["state"].(string)
if !ok {
return "", fmt.Errorf("%v.shards[%v].state is not a string", indexName, shardIdx)
}

if isPrimary {
areAllPrimariesStarted = areAllPrimariesStarted && (state == "STARTED")
Expand Down Expand Up @@ -357,8 +439,15 @@ func getIndexShardStats(shards mapstr.M) (*shardStats, error) {

shard := mapstr.M(s)

isPrimary := shard["primary"].(bool)
state := shard["state"].(string)
isPrimary, ok := shard["primary"].(bool)
if !ok {
return nil, fmt.Errorf("%v.shards[%v].primary is not a boolean", indexName, shardIdx)
}

state, ok := shard["state"].(string)
if !ok {
return nil, fmt.Errorf("%v.shards[%v].state is not a string", indexName, shardIdx)
}

if isPrimary {
primaries++
Expand Down
2 changes: 1 addition & 1 deletion metricbeat/tests/system/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def test_index_management(self):
assert len(es.cat.templates(name='metricbeat-*', h='name')) > 0

@unittest.skipUnless(INTEGRATION_TESTS, "integration test")
@pytest.mark.timeout(8*60, func_only=True)
@pytest.mark.timeout(8 * 60, func_only=True)
def test_dashboards(self):
"""
Test that the dashboards can be loaded with `setup --dashboards`
Expand Down
Loading