Skip to content

Commit

Permalink
prometheus-community#257: Add gauge for total fields mapped in an index
Browse files Browse the repository at this point in the history
Signed-off-by: Mike Roest <[email protected]>
  • Loading branch information
msodi authored and repl-mike-roest committed May 31, 2021
1 parent 07c37d0 commit 9ff2e26
Show file tree
Hide file tree
Showing 5 changed files with 371 additions and 0 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ Further Information
| elasticsearch_indices_segments_count | gauge | 1 | Count of index segments on this node
| elasticsearch_indices_segments_memory_bytes | gauge | 1 | Current memory size of segments in bytes
| elasticsearch_indices_settings_stats_read_only_indices | gauge | 1 | Count of indices that have read_only_allow_delete=true
| elasticsearch_indices_mappings_stats_number_of_fields | gauge | 1 | Count of fields
| elasticsearch_indices_shards_docs | gauge | 3 | Count of documents on this shard
| elasticsearch_indices_shards_docs_deleted | gauge | 3 | Count of deleted documents on each shard
| elasticsearch_indices_store_size_bytes | gauge | 1 | Current size of stored index data in bytes
Expand Down
182 changes: 182 additions & 0 deletions collector/indices_mappings.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
package collector

import (
"encoding/json"
"fmt"
"net/http"
"net/url"
"path"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/client_golang/prometheus"
)

var (
defaultIndicesMappingsLabels = []string{"index"}
)

type indicesMappingsMetric struct {
Type prometheus.ValueType
Desc *prometheus.Desc
Value func(indexMapping IndexMapping) float64
}

// IndicesMappings information struct
type IndicesMappings struct {
logger log.Logger
client *http.Client
url *url.URL

up prometheus.Gauge
totalScrapes, jsonParseFailures prometheus.Counter

metrics []*indicesMappingsMetric
}

// NewIndicesMappings defines Indices Mappings Prometheus metrics
func NewIndicesMappings(logger log.Logger, client *http.Client, url *url.URL) *IndicesMappings {
subsystem := "indices_mappings_stats"

return &IndicesMappings{
logger: logger,
client: client,
url: url,

up: prometheus.NewGauge(prometheus.GaugeOpts{
Name: prometheus.BuildFQName(namespace, subsystem, "up"),
Help: "Was the last scrape of the ElasticSearch Indices Mappings endpoint successful.",
}),
totalScrapes: prometheus.NewCounter(prometheus.CounterOpts{
Name: prometheus.BuildFQName(namespace, subsystem, "total_scrapes"),
Help: "Current total ElasticSearch Indices Mappings scrapes.",
}),
jsonParseFailures: prometheus.NewCounter(prometheus.CounterOpts{
Name: prometheus.BuildFQName(namespace, subsystem, "json_parse_failures"),
Help: "Number of errors while parsing JSON.",
}),
metrics: []*indicesMappingsMetric{
{
Type: prometheus.GaugeValue,
Desc: prometheus.NewDesc(
prometheus.BuildFQName(namespace, subsystem, "number_of_fields"),
"Current number fields within cluster.",
defaultIndicesMappingsLabels, nil,
),
Value: func(indexMapping IndexMapping) float64 {
return countFieldsRecursive(indexMapping.Mappings.Properties, 0)
},
},
},
}
}

func countFieldsRecursive(properties Properties, fieldCounter float64) float64 {
// iterate over all properties
for _, property := range properties {
if property.Type != nil {
// property has a type set - counts as a field
fieldCounter++

// iterate over all fields of that property
for _, field := range property.Fields {
// field has a type set - counts as a field
if field.Type != nil {
fieldCounter++
}
}
}

// count recursively in case the property has more properties
if property.Properties != nil {
fieldCounter = +countFieldsRecursive(property.Properties, fieldCounter)
}
}

return fieldCounter
}

// Describe add Snapshots metrics descriptions
func (im *IndicesMappings) Describe(ch chan<- *prometheus.Desc) {
for _, metric := range im.metrics {
ch <- metric.Desc
}

ch <- im.up.Desc()
ch <- im.totalScrapes.Desc()
ch <- im.jsonParseFailures.Desc()
}

func (im *IndicesMappings) getAndParseURL(u *url.URL, data interface{}) error {
res, err := im.client.Get(u.String())
if err != nil {
return fmt.Errorf("failed to get from %s://%s:%s%s: %s",
u.Scheme, u.Hostname(), u.Port(), u.Path, err)
}

defer func() {
err = res.Body.Close()
if err != nil {
_ = level.Warn(im.logger).Log(
"msg", "failed to close http.Client",
"err", err,
)
}
}()

if res.StatusCode != http.StatusOK {
return fmt.Errorf("HTTP Request failed with code %d", res.StatusCode)
}

if err := json.NewDecoder(res.Body).Decode(data); err != nil {
im.jsonParseFailures.Inc()
return err
}
return nil
}

func (im *IndicesMappings) fetchAndDecodeIndicesMappings() (IndicesMappingsResponse, error) {

u := *im.url
u.Path = path.Join(u.Path, "/_all/_mappings")
var imr IndicesMappingsResponse
err := im.getAndParseURL(&u, &imr)
if err != nil {
return imr, err
}

return imr, err
}

// Collect gets all indices mappings metric values
func (im *IndicesMappings) Collect(ch chan<- prometheus.Metric) {

im.totalScrapes.Inc()
defer func() {
ch <- im.up
ch <- im.totalScrapes
ch <- im.jsonParseFailures
}()

indicesMappingsResponse, err := im.fetchAndDecodeIndicesMappings()
if err != nil {
im.up.Set(0)
_ = level.Warn(im.logger).Log(
"msg", "failed to fetch and decode cluster mappings stats",
"err", err,
)
return
}
im.up.Set(1)

for _, metric := range im.metrics {
for indexName, mappings := range indicesMappingsResponse {
ch <- prometheus.MustNewConstMetric(
metric.Desc,
metric.Type,
metric.Value(mappings),
indexName,
)
}
}
}
34 changes: 34 additions & 0 deletions collector/indices_mappings_response.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package collector

// IndicesMappingsResponse is a representation of elasticsearch mappings for each index
type IndicesMappingsResponse map[string]IndexMapping

// IndexMapping defines the struct of the tree for the mappings of each index
type IndexMapping struct {
Mappings Mappings `json:"mappings"`
}

// Mappings defines all index mappings
type Mappings struct {
Properties Properties `json:"properties"`
}

// Properties defines all the properties of the current mapping
type Properties map[string]*Property

// Fields defines all the fields of the current mapping
type Fields map[string]*Field

// Property defines a single property of the current index properties
type Property struct {
Type *string `json:"type"`
Properties Properties `json:"properties"`
Fields Fields `json:"fields"`
}

// Field defines a single property of the current index field
type Field struct {
Type *string `json:"type"`
Properties Properties `json:"properties"`
Fields Fields `json:"fields"`
}
147 changes: 147 additions & 0 deletions collector/indices_mappings_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
package collector

import (
"fmt"
"net/http"
"net/http/httptest"
"net/url"
"testing"

"github.com/go-kit/kit/log"
)

func TestMapping(t *testing.T) {
// Testcases created using:
// docker run -p 9200:9200 -e "discovery.type=single-node" elasticsearch:7.8.0
// curl -XPUT http://localhost:9200/twitter
// curl -XPUT http://localhost:9200/facebook
/* curl -XPUT http://localhost:9200/twitter/_mapping -H 'Content-Type: application/json' -d'{
"properties": {
"email": {
"type": "keyword"
},
"phone": {
"type": "keyword"
}
}
}'*/
/* curl -XPUT http://localhost:9200/facebook/_mapping -H 'Content-Type: application/json' -d'{
"properties": {
"name": {
"type": "text",
"fields": {
"raw": {
"type": "keyword"
}
}
},
"contact": {
"properties": {
"email": {
"type": "text",
"fields": {
"raw": {
"type": "keyword"
}
}
},
"phone": {
"type": "text"
}
}
}
}
}'*/
// curl http://localhost:9200/_all/_mapping
tcs := map[string]string{
"7.8.0": `{
"facebook": {
"mappings": {
"properties": {
"contact": {
"properties": {
"email": {
"type": "text",
"fields": {
"raw": {
"type": "keyword"
}
}
},
"phone": {
"type": "text"
}
}
},
"name": {
"type": "text",
"fields": {
"raw": {
"type": "keyword"
}
}
}
}
}
},
"twitter": {
"mappings": {
"properties": {
"email": {
"type": "keyword"
},
"phone": {
"type": "keyword"
}
}
}
}
}`,
}
for ver, out := range tcs {
for hn, handler := range map[string]http.Handler{
"plain": http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintln(w, out)
}),
} {
ts := httptest.NewServer(handler)
defer ts.Close()

u, err := url.Parse(ts.URL)
if err != nil {
t.Fatalf("Failed to parse URL: %s", err)
}
c := NewIndicesMappings(log.NewNopLogger(), http.DefaultClient, u)
imr, err := c.fetchAndDecodeIndicesMappings()
if err != nil {
t.Fatalf("Failed to fetch or decode indices mappings: %s", err)
}
t.Logf("[%s/%s] All Indices Mappings Response: %+v", hn, ver, imr)

if *imr["facebook"].Mappings.Properties["contact"].Properties["phone"].Type != "text" {
t.Errorf("Marshalling error at facebook.contact.phone")
}

if *imr["facebook"].Mappings.Properties["contact"].Properties["email"].Fields["raw"].Type != "keyword" {
t.Errorf("Marshalling error at facebook.contact.email.raw")
}

if *imr["facebook"].Mappings.Properties["name"].Type != "text" {
t.Errorf("Marshalling error at facebook.name")
}

if *imr["facebook"].Mappings.Properties["name"].Fields["raw"].Type != "keyword" {
t.Errorf("Marshalling error at facebook.name.raw")
}

if *imr["twitter"].Mappings.Properties["email"].Type != "keyword" {
t.Errorf("Marshalling error at twitter.email")
}

if *imr["twitter"].Mappings.Properties["phone"].Type != "keyword" {
t.Errorf("Marshalling error at twitter.phone")
}

}
}
}
7 changes: 7 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ func main() {
esExportIndicesSettings = kingpin.Flag("es.indices_settings",
"Export stats for settings of all indices of the cluster.").
Default("false").Envar("ES_INDICES_SETTINGS").Bool()
esExportIndicesMappings = kingpin.Flag("es.indices_mappings",
"Export stats for mappings of all indices of the cluster.").
Default("false").Envar("ES_INDICES_MAPPINGS").Bool()
esExportClusterSettings = kingpin.Flag("es.cluster_settings",
"Export stats for cluster settings.").
Default("false").Envar("ES_CLUSTER_SETTINGS").Bool()
Expand Down Expand Up @@ -150,6 +153,10 @@ func main() {
prometheus.MustRegister(collector.NewIndicesSettings(logger, httpClient, esURL))
}

if *esExportIndicesMappings {
prometheus.MustRegister(collector.NewIndicesMappings(logger, httpClient, esURL))
}

// create a http server
server := &http.Server{}

Expand Down

0 comments on commit 9ff2e26

Please sign in to comment.