Skip to content

Commit

Permalink
Reduce memory usage of elasticsearch.index metricset (#16538)
Browse files Browse the repository at this point in the history
* [Debugging only!] Add charts to see memory usage over time.

THIS COMMIT MUST BE REMOVED AT THE END!!!

* [Debugging only] Disable validation so we can test only index metricset

* Streaming parser

* Removing debugcharts

* Replace maps with structs

* Running go mod tidy

* Pass pointer

* Uncomment code

* Reverting unnecessary changes

* Adding CHANGELOG entry

* Incorporating benchmark test

* Removing unnecessary nil check
  • Loading branch information
ycombinator authored Mar 17, 2020
1 parent d9c83df commit be9e426
Show file tree
Hide file tree
Showing 4 changed files with 118 additions and 99 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Fix imports after PR was merged before rebase. {pull}16756[16756]
- Add dashboard for `redisenterprise` module. {pull}16752[16752]
- Dynamically choose a method for the system/service metricset to support older linux distros. {pull}16902[16902]
- Reduce memory usage in `elasticsearch/index` metricset. {issue}16503[16503] {pull}16538[16538]

*Packetbeat*

Expand Down
210 changes: 114 additions & 96 deletions metricbeat/module/elasticsearch/index/data_xpack.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,96 +27,117 @@ import (
"github.com/pkg/errors"

"github.com/elastic/beats/v7/libbeat/common"
s "github.com/elastic/beats/v7/libbeat/common/schema"
c "github.com/elastic/beats/v7/libbeat/common/schema/mapstriface"
"github.com/elastic/beats/v7/metricbeat/helper/elastic"
"github.com/elastic/beats/v7/metricbeat/mb"
"github.com/elastic/beats/v7/metricbeat/module/elasticsearch"
)

var (
// Based on https://github.com/elastic/elasticsearch/blob/master/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexStatsMonitoringDoc.java#L127-L203
xpackSchema = s.Schema{
"uuid": c.Str("uuid"),
"primaries": c.Dict("primaries", indexStatsSchema),
"total": c.Dict("total", indexStatsSchema),
}
errParse = errors.New("failure parsing Indices Stats Elasticsearch API response")
)

indexStatsSchema = s.Schema{
"docs": c.Dict("docs", s.Schema{
"count": c.Int("count"),
}),
"fielddata": c.Dict("fielddata", s.Schema{
"memory_size_in_bytes": c.Int("memory_size_in_bytes"),
"evictions": c.Int("evictions"),
}),
"indexing": c.Dict("indexing", s.Schema{
"index_total": c.Int("index_total"),
"index_time_in_millis": c.Int("index_time_in_millis"),
"throttle_time_in_millis": c.Int("throttle_time_in_millis"),
}),
"merges": c.Dict("merges", s.Schema{
"total_size_in_bytes": c.Int("total_size_in_bytes"),
}),
"query_cache": c.Dict("query_cache", cacheStatsSchema),
"request_cache": c.Dict("request_cache", cacheStatsSchema),
"search": c.Dict("search", s.Schema{
"query_total": c.Int("query_total"),
"query_time_in_millis": c.Int("query_time_in_millis"),
}),
"segments": c.Dict("segments", s.Schema{
"count": c.Int("count"),
"memory_in_bytes": c.Int("memory_in_bytes"),
"terms_memory_in_bytes": c.Int("terms_memory_in_bytes"),
"stored_fields_memory_in_bytes": c.Int("stored_fields_memory_in_bytes"),
"term_vectors_memory_in_bytes": c.Int("term_vectors_memory_in_bytes"),
"norms_memory_in_bytes": c.Int("norms_memory_in_bytes"),
"points_memory_in_bytes": c.Int("points_memory_in_bytes"),
"doc_values_memory_in_bytes": c.Int("doc_values_memory_in_bytes"),
"index_writer_memory_in_bytes": c.Int("index_writer_memory_in_bytes"),
"version_map_memory_in_bytes": c.Int("version_map_memory_in_bytes"),
"fixed_bit_set_memory_in_bytes": c.Int("fixed_bit_set_memory_in_bytes"),
}),
"store": c.Dict("store", s.Schema{
"size_in_bytes": c.Int("size_in_bytes"),
}),
"refresh": c.Dict("refresh", s.Schema{
"external_total_time_in_millis": c.Int("external_total_time_in_millis", s.Optional),
"total_time_in_millis": c.Int("total_time_in_millis"),
}),
}
// Based on https://github.com/elastic/elasticsearch/blob/master/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexStatsMonitoringDoc.java#L127-L203
type stats struct {
Indices map[string]index `json:"indices"`
}

cacheStatsSchema = s.Schema{
"memory_size_in_bytes": c.Int("memory_size_in_bytes"),
"evictions": c.Int("evictions"),
"hit_count": c.Int("hit_count"),
"miss_count": c.Int("miss_count"),
}
)
type index struct {
UUID string `json:"uuid"`
Primaries indexStats `json:"primaries"`
Total indexStats `json:"total"`

func eventsMappingXPack(r mb.ReporterV2, m *MetricSet, info elasticsearch.Info, content []byte) error {
var indicesStruct IndicesStruct
if err := parseAPIResponse(content, &indicesStruct); err != nil {
return errors.Wrap(err, "failure parsing Indices Stats Elasticsearch API response")
}
Index string `json:"index"`
Created int64 `json:"created"`
Status string `json:"status"`
Shards shardStats `json:"shards"`
}

type indexStats struct {
Docs struct {
Count int `json:"count"`
} `json:"docs"`
FieldData struct {
MemorySizeInBytes int `json:"memory_size_in_bytes"`
Evictions int `json:"evictions"`
} `json:"fielddata"`
Indexing struct {
IndexTotal int `json:"index_total"`
IndexTimeInMillis int `json:"index_time_in_millis"`
ThrottleTimeInMillis int `json:"throttle_time_in_millis"`
} `json:"indexing"`
Merges struct {
TotalSizeInBytes int `json:"total_size_in_bytes"`
} `json:"merges"`
QueryCache cacheStats `json:"query_stats"`
RequestCache cacheStats `json:"request_cache"`
Search struct {
QueryTotal int `json:"query_total"`
QueryTimeInMillis int `json:"query_time_in_millis"`
} `json:"search"`
Segments struct {
Count int `json:"count"`
MemoryInBytes int `json:"memory_in_bytes"`
TermsMemoryInBytes int `json:"terms_memory_in_bytes"`
StoredFieldsMemoryInBytes int `json:"stored_fields_memory_in_bytes"`
TermVectorsMemoryInBytes int `json:"term_vectors_memory_in_bytes"`
NormsMemoryInBytes int `json:"norms_memory_in_bytes"`
PointsMemoryInBytes int `json:"points_memory_in_bytes"`
DocValuesMemoryInBytes int `json:"doc_values_memory_in_bytes"`
IndexWriterMemoryInBytes int `json:"index_writer_memory_in_bytes"`
VersionMapMemoryInBytes int `json:"version_map_memory_in_bytes"`
FixedBitSetMemoryInBytes int `json:"fixed_bit_set_memory_in_bytes"`
} `json:"segments"`
Store struct {
SizeInBytes int `json:"size_in_bytes"`
} `json:"store"`
Refresh struct {
ExternalTotalTimeInMillis int `json:"external_total_time_in_millis"`
TotalTimeInMillis int `json:"total_time_in_millis"`
} `json:"refresh"`
}

type cacheStats struct {
MemorySizeInBytes int `json:"memory_size_in_bytes"`
Evictions int `json:"evictions"`
HitCount int `json:"hit_count"`
MissCount int `json:"miss_count"`
}

type shardStats struct {
Total int `json:"total"`
Primaries int `json:"primaries"`
Replicas int `json:"replicas"`

ActiveTotal int `json:"active_total"`
ActivePrimaries int `json:"active_primaries"`
ActiveReplicas int `json:"active_replicas"`

UnassignedTotal int `json:"unassigned_total"`
UnassignedPrimaries int `json:"unassigned_primaries"`
UnassignedReplicas int `json:"unassigned_replicas"`

Initializing int `json:"initializing"`
Relocating int `json:"relocationg"`
}

func eventsMappingXPack(r mb.ReporterV2, m *MetricSet, info elasticsearch.Info, content []byte) error {
clusterStateMetrics := []string{"metadata", "routing_table"}
clusterState, err := elasticsearch.GetClusterState(m.HTTP, m.HTTP.GetURI(), clusterStateMetrics)
if err != nil {
return errors.Wrap(err, "failure retrieving cluster state from Elasticsearch")
}

var indicesStats stats
if err := parseAPIResponse(content, &indicesStats); err != nil {
return errors.Wrap(err, "failure parsing Indices Stats Elasticsearch API response")
}

var errs multierror.Errors
for name, index := range indicesStruct.Indices {
for name, idx := range indicesStats.Indices {
event := mb.Event{}
indexStats, err := xpackSchema.Apply(index)
if err != nil {
errs = append(errs, errors.Wrap(err, "failure applying index stats schema"))
continue
}
indexStats["index"] = name
idx.Index = name

err = addClusterStateFields(name, indexStats, clusterState)
err = addClusterStateFields(&idx, clusterState)
if err != nil {
errs = append(errs, errors.Wrap(err, "failure adding cluster state fields"))
continue
Expand All @@ -127,7 +148,7 @@ func eventsMappingXPack(r mb.ReporterV2, m *MetricSet, info elasticsearch.Info,
"timestamp": common.Time(time.Now()),
"interval_ms": m.Module().Config().Period / time.Millisecond,
"type": "index_stats",
"index_stats": indexStats,
"index_stats": idx,
}

event.Index = elastic.MakeXPackMonitoringIndexName(elastic.Elasticsearch)
Expand All @@ -137,19 +158,19 @@ func eventsMappingXPack(r mb.ReporterV2, m *MetricSet, info elasticsearch.Info,
return errs.Err()
}

func parseAPIResponse(content []byte, indicesStruct *IndicesStruct) error {
return json.Unmarshal(content, indicesStruct)
func parseAPIResponse(content []byte, indicesStats *stats) error {
return json.Unmarshal(content, indicesStats)
}

// Fields added here are based on same fields being added by internal collection in
// https://github.com/elastic/elasticsearch/blob/master/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexStatsMonitoringDoc.java#L62-L124
func addClusterStateFields(indexName string, indexStats, clusterState common.MapStr) error {
indexMetadata, err := getClusterStateMetricForIndex(clusterState, indexName, "metadata")
func addClusterStateFields(idx *index, clusterState common.MapStr) error {
indexMetadata, err := getClusterStateMetricForIndex(clusterState, idx.Index, "metadata")
if err != nil {
return errors.Wrap(err, "failed to get index metadata from cluster state")
}

indexRoutingTable, err := getClusterStateMetricForIndex(clusterState, indexName, "routing_table")
indexRoutingTable, err := getClusterStateMetricForIndex(clusterState, idx.Index, "routing_table")
if err != nil {
return errors.Wrap(err, "failed to get index routing table from cluster state")
}
Expand All @@ -163,7 +184,7 @@ func addClusterStateFields(indexName string, indexStats, clusterState common.Map
if err != nil {
return errors.Wrap(err, "failed to get index creation time")
}
indexStats.Put("created", created)
idx.Created = created

// "index_stats.version.created", <--- don't think this is being used in the UI, so can we skip it?
// "index_stats.version.upgraded", <--- don't think this is being used in the UI, so can we skip it?
Expand All @@ -172,13 +193,13 @@ func addClusterStateFields(indexName string, indexStats, clusterState common.Map
if err != nil {
return errors.Wrap(err, "failed to get index status")
}
indexStats.Put("status", status)
idx.Status = status

shardStats, err := getIndexShardStats(shards)
if err != nil {
return errors.Wrap(err, "failed to get index shard stats")
}
indexStats.Put("shards", shardStats)
idx.Shards = *shardStats
return nil
}

Expand Down Expand Up @@ -241,7 +262,7 @@ func getIndexStatus(shards map[string]interface{}) (string, error) {
return "red", nil
}

func getIndexShardStats(shards common.MapStr) (common.MapStr, error) {
func getIndexShardStats(shards common.MapStr) (*shardStats, error) {
primaries := 0
replicas := 0

Expand Down Expand Up @@ -298,21 +319,18 @@ func getIndexShardStats(shards common.MapStr) (common.MapStr, error) {
}
}

return common.MapStr{
"total": primaries + replicas,
"primaries": primaries,
"replicas": replicas,

"active_total": activePrimaries + activeReplicas,
"active_primaries": activePrimaries,
"active_replicas": activeReplicas,

"unassigned_total": unassignedPrimaries + unassignedReplicas,
"unassigned_primaries": unassignedPrimaries,
"unassigned_replicas": unassignedReplicas,

"initializing": initializing,
"relocating": relocating,
return &shardStats{
Total: primaries + replicas,
Primaries: primaries,
Replicas: replicas,
ActiveTotal: activePrimaries + activeReplicas,
ActivePrimaries: activePrimaries,
ActiveReplicas: activeReplicas,
UnassignedTotal: unassignedPrimaries + unassignedReplicas,
UnassignedPrimaries: unassignedPrimaries,
UnassignedReplicas: unassignedReplicas,
Initializing: initializing,
Relocating: relocating,
}, nil
}

Expand Down
4 changes: 2 additions & 2 deletions metricbeat/module/elasticsearch/index/data_xpack_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ func BenchmarkParseAPIResponse(b *testing.B) {
content, err := ioutil.ReadFile("_meta/test/stats.800.bench.json")
require.NoError(b, err)

var indicesStruct IndicesStruct
var indicesStats stats

for i := 0; i < b.N; i++ {
err = parseAPIResponse(content, &indicesStruct)
err = parseAPIResponse(content, &indicesStats)
require.NoError(b, err)
}

Expand Down
2 changes: 1 addition & 1 deletion metricbeat/module/elasticsearch/index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func init() {

const (
statsMetrics = "docs,fielddata,indexing,merge,search,segments,store,refresh,query_cache,request_cache"
statsPath = "/_stats/" + statsMetrics
statsPath = "/_stats/" + statsMetrics + "?filter_path=indices"
)

// MetricSet type defines all fields of the MetricSet
Expand Down

0 comments on commit be9e426

Please sign in to comment.