From 083212ebffe934375c8e61bd91b5beb3e980c000 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Thu, 19 Mar 2020 12:19:24 -0700 Subject: [PATCH] [7.6] Reduce memory usage of `elasticsearch.index` metricset (#16538) (#17070) * Reduce memory usage of `elasticsearch.index` metricset (#16538) * [Debugging only!] Add charts to see memory usage over time. THIS COMMIT MUST BE REMOVED AT THE END!!! * [Debugging only] Disable validation so we can test only index metricset * Streaming parser * Removing debugcharts * Replace maps with structs * Running go mod tidy * Pass pointer * Uncomment code * Reverting unnecessary changes * Adding CHANGELOG entry * Incorporating benchmark test * Removing unnecessary nil check * Fixing up CHANGELOG * Fixing up bad rebase --- CHANGELOG.next.asciidoc | 1 + .../module/elasticsearch/index/data_xpack.go | 210 ++++++++++-------- .../elasticsearch/index/data_xpack_test.go | 4 +- .../module/elasticsearch/index/index.go | 2 +- 4 files changed, 118 insertions(+), 99 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index f4479918a87..6ea0743229f 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -70,6 +70,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Revert changes in `docker` module: add size flag to docker.container. {pull}16600[16600] - Convert increments of 100 nanoseconds/ticks to milliseconds for WriteTime and ReadTime in diskio metricset (Windows) for consistency. {issue}14233[14233] - Fix diskio issue for windows 32 bit on disk_performance struct alignment. {issue}16680[16680] +- Reduce memory usage in `elasticsearch/index` metricset. {issue}16503[16503] {pull}16538[16538] *Packetbeat* diff --git a/metricbeat/module/elasticsearch/index/data_xpack.go b/metricbeat/module/elasticsearch/index/data_xpack.go index 2ad4c711c78..6452129bdf2 100644 --- a/metricbeat/module/elasticsearch/index/data_xpack.go +++ b/metricbeat/module/elasticsearch/index/data_xpack.go @@ -27,96 +27,117 @@ import ( "github.com/pkg/errors" "github.com/elastic/beats/libbeat/common" - s "github.com/elastic/beats/libbeat/common/schema" - c "github.com/elastic/beats/libbeat/common/schema/mapstriface" "github.com/elastic/beats/metricbeat/helper/elastic" "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/metricbeat/module/elasticsearch" ) var ( - // Based on https://github.com/elastic/elasticsearch/blob/master/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexStatsMonitoringDoc.java#L127-L203 - xpackSchema = s.Schema{ - "uuid": c.Str("uuid"), - "primaries": c.Dict("primaries", indexStatsSchema), - "total": c.Dict("total", indexStatsSchema), - } + errParse = errors.New("failure parsing Indices Stats Elasticsearch API response") +) - indexStatsSchema = s.Schema{ - "docs": c.Dict("docs", s.Schema{ - "count": c.Int("count"), - }), - "fielddata": c.Dict("fielddata", s.Schema{ - "memory_size_in_bytes": c.Int("memory_size_in_bytes"), - "evictions": c.Int("evictions"), - }), - "indexing": c.Dict("indexing", s.Schema{ - "index_total": c.Int("index_total"), - "index_time_in_millis": c.Int("index_time_in_millis"), - "throttle_time_in_millis": c.Int("throttle_time_in_millis"), - }), - "merges": c.Dict("merges", s.Schema{ - "total_size_in_bytes": c.Int("total_size_in_bytes"), - }), - "query_cache": c.Dict("query_cache", cacheStatsSchema), - "request_cache": c.Dict("request_cache", cacheStatsSchema), - "search": c.Dict("search", s.Schema{ - "query_total": c.Int("query_total"), - "query_time_in_millis": c.Int("query_time_in_millis"), - }), - "segments": c.Dict("segments", s.Schema{ - "count": c.Int("count"), - "memory_in_bytes": c.Int("memory_in_bytes"), - "terms_memory_in_bytes": c.Int("terms_memory_in_bytes"), - "stored_fields_memory_in_bytes": c.Int("stored_fields_memory_in_bytes"), - "term_vectors_memory_in_bytes": c.Int("term_vectors_memory_in_bytes"), - "norms_memory_in_bytes": c.Int("norms_memory_in_bytes"), - "points_memory_in_bytes": c.Int("points_memory_in_bytes"), - "doc_values_memory_in_bytes": c.Int("doc_values_memory_in_bytes"), - "index_writer_memory_in_bytes": c.Int("index_writer_memory_in_bytes"), - "version_map_memory_in_bytes": c.Int("version_map_memory_in_bytes"), - "fixed_bit_set_memory_in_bytes": c.Int("fixed_bit_set_memory_in_bytes"), - }), - "store": c.Dict("store", s.Schema{ - "size_in_bytes": c.Int("size_in_bytes"), - }), - "refresh": c.Dict("refresh", s.Schema{ - "external_total_time_in_millis": c.Int("external_total_time_in_millis", s.Optional), - "total_time_in_millis": c.Int("total_time_in_millis"), - }), - } +// Based on https://github.com/elastic/elasticsearch/blob/master/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexStatsMonitoringDoc.java#L127-L203 +type stats struct { + Indices map[string]index `json:"indices"` +} - cacheStatsSchema = s.Schema{ - "memory_size_in_bytes": c.Int("memory_size_in_bytes"), - "evictions": c.Int("evictions"), - "hit_count": c.Int("hit_count"), - "miss_count": c.Int("miss_count"), - } -) +type index struct { + UUID string `json:"uuid"` + Primaries indexStats `json:"primaries"` + Total indexStats `json:"total"` -func eventsMappingXPack(r mb.ReporterV2, m *MetricSet, info elasticsearch.Info, content []byte) error { - var indicesStruct IndicesStruct - if err := parseAPIResponse(content, &indicesStruct); err != nil { - return errors.Wrap(err, "failure parsing Indices Stats Elasticsearch API response") - } + Index string `json:"index"` + Created int64 `json:"created"` + Status string `json:"status"` + Shards shardStats `json:"shards"` +} + +type indexStats struct { + Docs struct { + Count int `json:"count"` + } `json:"docs"` + FieldData struct { + MemorySizeInBytes int `json:"memory_size_in_bytes"` + Evictions int `json:"evictions"` + } `json:"fielddata"` + Indexing struct { + IndexTotal int `json:"index_total"` + IndexTimeInMillis int `json:"index_time_in_millis"` + ThrottleTimeInMillis int `json:"throttle_time_in_millis"` + } `json:"indexing"` + Merges struct { + TotalSizeInBytes int `json:"total_size_in_bytes"` + } `json:"merges"` + QueryCache cacheStats `json:"query_stats"` + RequestCache cacheStats `json:"request_cache"` + Search struct { + QueryTotal int `json:"query_total"` + QueryTimeInMillis int `json:"query_time_in_millis"` + } `json:"search"` + Segments struct { + Count int `json:"count"` + MemoryInBytes int `json:"memory_in_bytes"` + TermsMemoryInBytes int `json:"terms_memory_in_bytes"` + StoredFieldsMemoryInBytes int `json:"stored_fields_memory_in_bytes"` + TermVectorsMemoryInBytes int `json:"term_vectors_memory_in_bytes"` + NormsMemoryInBytes int `json:"norms_memory_in_bytes"` + PointsMemoryInBytes int `json:"points_memory_in_bytes"` + DocValuesMemoryInBytes int `json:"doc_values_memory_in_bytes"` + IndexWriterMemoryInBytes int `json:"index_writer_memory_in_bytes"` + VersionMapMemoryInBytes int `json:"version_map_memory_in_bytes"` + FixedBitSetMemoryInBytes int `json:"fixed_bit_set_memory_in_bytes"` + } `json:"segments"` + Store struct { + SizeInBytes int `json:"size_in_bytes"` + } `json:"store"` + Refresh struct { + ExternalTotalTimeInMillis int `json:"external_total_time_in_millis"` + TotalTimeInMillis int `json:"total_time_in_millis"` + } `json:"refresh"` +} + +type cacheStats struct { + MemorySizeInBytes int `json:"memory_size_in_bytes"` + Evictions int `json:"evictions"` + HitCount int `json:"hit_count"` + MissCount int `json:"miss_count"` +} + +type shardStats struct { + Total int `json:"total"` + Primaries int `json:"primaries"` + Replicas int `json:"replicas"` + ActiveTotal int `json:"active_total"` + ActivePrimaries int `json:"active_primaries"` + ActiveReplicas int `json:"active_replicas"` + + UnassignedTotal int `json:"unassigned_total"` + UnassignedPrimaries int `json:"unassigned_primaries"` + UnassignedReplicas int `json:"unassigned_replicas"` + + Initializing int `json:"initializing"` + Relocating int `json:"relocationg"` +} + +func eventsMappingXPack(r mb.ReporterV2, m *MetricSet, info elasticsearch.Info, content []byte) error { clusterStateMetrics := []string{"metadata", "routing_table"} clusterState, err := elasticsearch.GetClusterState(m.HTTP, m.HTTP.GetURI(), clusterStateMetrics) if err != nil { return errors.Wrap(err, "failure retrieving cluster state from Elasticsearch") } + var indicesStats stats + if err := parseAPIResponse(content, &indicesStats); err != nil { + return errors.Wrap(err, "failure parsing Indices Stats Elasticsearch API response") + } + var errs multierror.Errors - for name, index := range indicesStruct.Indices { + for name, idx := range indicesStats.Indices { event := mb.Event{} - indexStats, err := xpackSchema.Apply(index) - if err != nil { - errs = append(errs, errors.Wrap(err, "failure applying index stats schema")) - continue - } - indexStats["index"] = name + idx.Index = name - err = addClusterStateFields(name, indexStats, clusterState) + err = addClusterStateFields(&idx, clusterState) if err != nil { errs = append(errs, errors.Wrap(err, "failure adding cluster state fields")) continue @@ -127,7 +148,7 @@ func eventsMappingXPack(r mb.ReporterV2, m *MetricSet, info elasticsearch.Info, "timestamp": common.Time(time.Now()), "interval_ms": m.Module().Config().Period / time.Millisecond, "type": "index_stats", - "index_stats": indexStats, + "index_stats": idx, } event.Index = elastic.MakeXPackMonitoringIndexName(elastic.Elasticsearch) @@ -137,19 +158,19 @@ func eventsMappingXPack(r mb.ReporterV2, m *MetricSet, info elasticsearch.Info, return errs.Err() } -func parseAPIResponse(content []byte, indicesStruct *IndicesStruct) error { - return json.Unmarshal(content, indicesStruct) +func parseAPIResponse(content []byte, indicesStats *stats) error { + return json.Unmarshal(content, indicesStats) } // Fields added here are based on same fields being added by internal collection in // https://github.com/elastic/elasticsearch/blob/master/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexStatsMonitoringDoc.java#L62-L124 -func addClusterStateFields(indexName string, indexStats, clusterState common.MapStr) error { - indexMetadata, err := getClusterStateMetricForIndex(clusterState, indexName, "metadata") +func addClusterStateFields(idx *index, clusterState common.MapStr) error { + indexMetadata, err := getClusterStateMetricForIndex(clusterState, idx.Index, "metadata") if err != nil { return errors.Wrap(err, "failed to get index metadata from cluster state") } - indexRoutingTable, err := getClusterStateMetricForIndex(clusterState, indexName, "routing_table") + indexRoutingTable, err := getClusterStateMetricForIndex(clusterState, idx.Index, "routing_table") if err != nil { return errors.Wrap(err, "failed to get index routing table from cluster state") } @@ -163,7 +184,7 @@ func addClusterStateFields(indexName string, indexStats, clusterState common.Map if err != nil { return errors.Wrap(err, "failed to get index creation time") } - indexStats.Put("created", created) + idx.Created = created // "index_stats.version.created", <--- don't think this is being used in the UI, so can we skip it? // "index_stats.version.upgraded", <--- don't think this is being used in the UI, so can we skip it? @@ -172,13 +193,13 @@ func addClusterStateFields(indexName string, indexStats, clusterState common.Map if err != nil { return errors.Wrap(err, "failed to get index status") } - indexStats.Put("status", status) + idx.Status = status shardStats, err := getIndexShardStats(shards) if err != nil { return errors.Wrap(err, "failed to get index shard stats") } - indexStats.Put("shards", shardStats) + idx.Shards = *shardStats return nil } @@ -241,7 +262,7 @@ func getIndexStatus(shards map[string]interface{}) (string, error) { return "red", nil } -func getIndexShardStats(shards common.MapStr) (common.MapStr, error) { +func getIndexShardStats(shards common.MapStr) (*shardStats, error) { primaries := 0 replicas := 0 @@ -298,21 +319,18 @@ func getIndexShardStats(shards common.MapStr) (common.MapStr, error) { } } - return common.MapStr{ - "total": primaries + replicas, - "primaries": primaries, - "replicas": replicas, - - "active_total": activePrimaries + activeReplicas, - "active_primaries": activePrimaries, - "active_replicas": activeReplicas, - - "unassigned_total": unassignedPrimaries + unassignedReplicas, - "unassigned_primaries": unassignedPrimaries, - "unassigned_replicas": unassignedReplicas, - - "initializing": initializing, - "relocating": relocating, + return &shardStats{ + Total: primaries + replicas, + Primaries: primaries, + Replicas: replicas, + ActiveTotal: activePrimaries + activeReplicas, + ActivePrimaries: activePrimaries, + ActiveReplicas: activeReplicas, + UnassignedTotal: unassignedPrimaries + unassignedReplicas, + UnassignedPrimaries: unassignedPrimaries, + UnassignedReplicas: unassignedReplicas, + Initializing: initializing, + Relocating: relocating, }, nil } diff --git a/metricbeat/module/elasticsearch/index/data_xpack_test.go b/metricbeat/module/elasticsearch/index/data_xpack_test.go index 749467ef72d..1bdc790d9ae 100644 --- a/metricbeat/module/elasticsearch/index/data_xpack_test.go +++ b/metricbeat/module/elasticsearch/index/data_xpack_test.go @@ -31,10 +31,10 @@ func BenchmarkParseAPIResponse(b *testing.B) { content, err := ioutil.ReadFile("_meta/test/stats.800.bench.json") require.NoError(b, err) - var indicesStruct IndicesStruct + var indicesStats stats for i := 0; i < b.N; i++ { - err = parseAPIResponse(content, &indicesStruct) + err = parseAPIResponse(content, &indicesStats) require.NoError(b, err) } diff --git a/metricbeat/module/elasticsearch/index/index.go b/metricbeat/module/elasticsearch/index/index.go index cdf7ddd02d8..c639f470de2 100644 --- a/metricbeat/module/elasticsearch/index/index.go +++ b/metricbeat/module/elasticsearch/index/index.go @@ -35,7 +35,7 @@ func init() { const ( statsMetrics = "docs,fielddata,indexing,merge,search,segments,store,refresh,query_cache,request_cache" - statsPath = "/_stats/" + statsMetrics + statsPath = "/_stats/" + statsMetrics + "?filter_path=indices" ) // MetricSet type defines all fields of the MetricSet