diff --git a/README.md b/README.md index 5576cbc2..30cc5ab4 100644 --- a/README.md +++ b/README.md @@ -151,6 +151,7 @@ Further Information | elasticsearch_indices_segments_count | gauge | 1 | Count of index segments on this node | elasticsearch_indices_segments_memory_bytes | gauge | 1 | Current memory size of segments in bytes | elasticsearch_indices_settings_stats_read_only_indices | gauge | 1 | Count of indices that have read_only_allow_delete=true +| elasticsearch_indices_mappings_stats_number_of_fields | gauge | 1 | Count of fields | elasticsearch_indices_shards_docs | gauge | 3 | Count of documents on this shard | elasticsearch_indices_shards_docs_deleted | gauge | 3 | Count of deleted documents on each shard | elasticsearch_indices_store_size_bytes | gauge | 1 | Current size of stored index data in bytes diff --git a/collector/indices_mappings.go b/collector/indices_mappings.go new file mode 100644 index 00000000..4b42ed8d --- /dev/null +++ b/collector/indices_mappings.go @@ -0,0 +1,182 @@ +package collector + +import ( + "encoding/json" + "fmt" + "net/http" + "net/url" + "path" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/prometheus/client_golang/prometheus" +) + +var ( + defaultIndicesMappingsLabels = []string{"index"} +) + +type indicesMappingsMetric struct { + Type prometheus.ValueType + Desc *prometheus.Desc + Value func(indexMapping IndexMapping) float64 +} + +// IndicesMappings information struct +type IndicesMappings struct { + logger log.Logger + client *http.Client + url *url.URL + + up prometheus.Gauge + totalScrapes, jsonParseFailures prometheus.Counter + + metrics []*indicesMappingsMetric +} + +// NewIndicesMappings defines Indices Mappings Prometheus metrics +func NewIndicesMappings(logger log.Logger, client *http.Client, url *url.URL) *IndicesMappings { + subsystem := "indices_mappings_stats" + + return &IndicesMappings{ + logger: logger, + client: client, + url: url, + + up: prometheus.NewGauge(prometheus.GaugeOpts{ + Name: prometheus.BuildFQName(namespace, subsystem, "up"), + Help: "Was the last scrape of the ElasticSearch Indices Mappings endpoint successful.", + }), + totalScrapes: prometheus.NewCounter(prometheus.CounterOpts{ + Name: prometheus.BuildFQName(namespace, subsystem, "total_scrapes"), + Help: "Current total ElasticSearch Indices Mappings scrapes.", + }), + jsonParseFailures: prometheus.NewCounter(prometheus.CounterOpts{ + Name: prometheus.BuildFQName(namespace, subsystem, "json_parse_failures"), + Help: "Number of errors while parsing JSON.", + }), + metrics: []*indicesMappingsMetric{ + { + Type: prometheus.GaugeValue, + Desc: prometheus.NewDesc( + prometheus.BuildFQName(namespace, subsystem, "number_of_fields"), + "Current number fields within cluster.", + defaultIndicesMappingsLabels, nil, + ), + Value: func(indexMapping IndexMapping) float64 { + return countFieldsRecursive(indexMapping.Mappings.Properties, 0) + }, + }, + }, + } +} + +func countFieldsRecursive(properties Properties, fieldCounter float64) float64 { + // iterate over all properties + for _, property := range properties { + if property.Type != nil { + // property has a type set - counts as a field + fieldCounter++ + + // iterate over all fields of that property + for _, field := range property.Fields { + // field has a type set - counts as a field + if field.Type != nil { + fieldCounter++ + } + } + } + + // count recursively in case the property has more properties + if property.Properties != nil { + fieldCounter = +countFieldsRecursive(property.Properties, fieldCounter) + } + } + + return fieldCounter +} + +// Describe add Snapshots metrics descriptions +func (im *IndicesMappings) Describe(ch chan<- *prometheus.Desc) { + for _, metric := range im.metrics { + ch <- metric.Desc + } + + ch <- im.up.Desc() + ch <- im.totalScrapes.Desc() + ch <- im.jsonParseFailures.Desc() +} + +func (im *IndicesMappings) getAndParseURL(u *url.URL, data interface{}) error { + res, err := im.client.Get(u.String()) + if err != nil { + return fmt.Errorf("failed to get from %s://%s:%s%s: %s", + u.Scheme, u.Hostname(), u.Port(), u.Path, err) + } + + defer func() { + err = res.Body.Close() + if err != nil { + _ = level.Warn(im.logger).Log( + "msg", "failed to close http.Client", + "err", err, + ) + } + }() + + if res.StatusCode != http.StatusOK { + return fmt.Errorf("HTTP Request failed with code %d", res.StatusCode) + } + + if err := json.NewDecoder(res.Body).Decode(data); err != nil { + im.jsonParseFailures.Inc() + return err + } + return nil +} + +func (im *IndicesMappings) fetchAndDecodeIndicesMappings() (IndicesMappingsResponse, error) { + + u := *im.url + u.Path = path.Join(u.Path, "/_all/_mappings") + var imr IndicesMappingsResponse + err := im.getAndParseURL(&u, &imr) + if err != nil { + return imr, err + } + + return imr, err +} + +// Collect gets all indices mappings metric values +func (im *IndicesMappings) Collect(ch chan<- prometheus.Metric) { + + im.totalScrapes.Inc() + defer func() { + ch <- im.up + ch <- im.totalScrapes + ch <- im.jsonParseFailures + }() + + indicesMappingsResponse, err := im.fetchAndDecodeIndicesMappings() + if err != nil { + im.up.Set(0) + _ = level.Warn(im.logger).Log( + "msg", "failed to fetch and decode cluster mappings stats", + "err", err, + ) + return + } + im.up.Set(1) + + for _, metric := range im.metrics { + for indexName, mappings := range indicesMappingsResponse { + ch <- prometheus.MustNewConstMetric( + metric.Desc, + metric.Type, + metric.Value(mappings), + indexName, + ) + } + } +} diff --git a/collector/indices_mappings_response.go b/collector/indices_mappings_response.go new file mode 100644 index 00000000..0f1c1908 --- /dev/null +++ b/collector/indices_mappings_response.go @@ -0,0 +1,34 @@ +package collector + +// IndicesMappingsResponse is a representation of elasticsearch mappings for each index +type IndicesMappingsResponse map[string]IndexMapping + +// IndexMapping defines the struct of the tree for the mappings of each index +type IndexMapping struct { + Mappings Mappings `json:"mappings"` +} + +// Mappings defines all index mappings +type Mappings struct { + Properties Properties `json:"properties"` +} + +// Properties defines all the properties of the current mapping +type Properties map[string]*Property + +// Fields defines all the fields of the current mapping +type Fields map[string]*Field + +// Property defines a single property of the current index properties +type Property struct { + Type *string `json:"type"` + Properties Properties `json:"properties"` + Fields Fields `json:"fields"` +} + +// Field defines a single property of the current index field +type Field struct { + Type *string `json:"type"` + Properties Properties `json:"properties"` + Fields Fields `json:"fields"` +} \ No newline at end of file diff --git a/collector/indices_mappings_test.go b/collector/indices_mappings_test.go new file mode 100644 index 00000000..bbc9b44b --- /dev/null +++ b/collector/indices_mappings_test.go @@ -0,0 +1,147 @@ +package collector + +import ( + "fmt" + "net/http" + "net/http/httptest" + "net/url" + "testing" + + "github.com/go-kit/kit/log" +) + +func TestMapping(t *testing.T) { + // Testcases created using: + // docker run -p 9200:9200 -e "discovery.type=single-node" elasticsearch:7.8.0 + // curl -XPUT http://localhost:9200/twitter + // curl -XPUT http://localhost:9200/facebook + /* curl -XPUT http://localhost:9200/twitter/_mapping -H 'Content-Type: application/json' -d'{ + "properties": { + "email": { + "type": "keyword" + }, + "phone": { + "type": "keyword" + } + } + }'*/ + /* curl -XPUT http://localhost:9200/facebook/_mapping -H 'Content-Type: application/json' -d'{ + "properties": { + "name": { + "type": "text", + "fields": { + "raw": { + "type": "keyword" + } + } + }, + "contact": { + "properties": { + "email": { + "type": "text", + "fields": { + "raw": { + "type": "keyword" + } + } + }, + "phone": { + "type": "text" + } + } + } + } + }'*/ + // curl http://localhost:9200/_all/_mapping + tcs := map[string]string{ + "7.8.0": `{ + "facebook": { + "mappings": { + "properties": { + "contact": { + "properties": { + "email": { + "type": "text", + "fields": { + "raw": { + "type": "keyword" + } + } + }, + "phone": { + "type": "text" + } + } + }, + "name": { + "type": "text", + "fields": { + "raw": { + "type": "keyword" + } + } + } + } + } + }, + "twitter": { + "mappings": { + "properties": { + "email": { + "type": "keyword" + }, + "phone": { + "type": "keyword" + } + } + } + } + }`, + } + for ver, out := range tcs { + for hn, handler := range map[string]http.Handler{ + "plain": http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + fmt.Fprintln(w, out) + }), + } { + ts := httptest.NewServer(handler) + defer ts.Close() + + u, err := url.Parse(ts.URL) + if err != nil { + t.Fatalf("Failed to parse URL: %s", err) + } + c := NewIndicesMappings(log.NewNopLogger(), http.DefaultClient, u) + imr, err := c.fetchAndDecodeIndicesMappings() + if err != nil { + t.Fatalf("Failed to fetch or decode indices mappings: %s", err) + } + t.Logf("[%s/%s] All Indices Mappings Response: %+v", hn, ver, imr) + + if *imr["facebook"].Mappings.Properties["contact"].Properties["phone"].Type != "text" { + t.Errorf("Marshalling error at facebook.contact.phone") + } + + if *imr["facebook"].Mappings.Properties["contact"].Properties["email"].Fields["raw"].Type != "keyword" { + t.Errorf("Marshalling error at facebook.contact.email.raw") + } + + if *imr["facebook"].Mappings.Properties["name"].Type != "text" { + t.Errorf("Marshalling error at facebook.name") + } + + if *imr["facebook"].Mappings.Properties["name"].Fields["raw"].Type != "keyword" { + t.Errorf("Marshalling error at facebook.name.raw") + } + + if *imr["twitter"].Mappings.Properties["email"].Type != "keyword" { + t.Errorf("Marshalling error at twitter.email") + } + + if *imr["twitter"].Mappings.Properties["phone"].Type != "keyword" { + t.Errorf("Marshalling error at twitter.phone") + } + + } + } +} diff --git a/main.go b/main.go index b988f128..878fa23e 100644 --- a/main.go +++ b/main.go @@ -58,6 +58,9 @@ func main() { esExportIndicesSettings = kingpin.Flag("es.indices_settings", "Export stats for settings of all indices of the cluster."). Default("false").Envar("ES_INDICES_SETTINGS").Bool() + esExportIndicesMappings = kingpin.Flag("es.indices_mappings", + "Export stats for mappings of all indices of the cluster."). + Default("false").Envar("ES_INDICES_MAPPINGS").Bool() esExportClusterSettings = kingpin.Flag("es.cluster_settings", "Export stats for cluster settings."). Default("false").Envar("ES_CLUSTER_SETTINGS").Bool() @@ -150,6 +153,10 @@ func main() { prometheus.MustRegister(collector.NewIndicesSettings(logger, httpClient, esURL)) } + if *esExportIndicesMappings { + prometheus.MustRegister(collector.NewIndicesMappings(logger, httpClient, esURL)) + } + // create a http server server := &http.Server{}