Skip to content

Commit

Permalink
Add kubernetes apiserver metricset (elastic#7059)
Browse files Browse the repository at this point in the history
This new metricset gathers metrics from Kubernetes API servers, giving visibility on number of requests and latency.

This change adds support for `summary` and `histogram` metric type to our Prometheus helper, histogram is used to retrieve latency info.

I also simplified the process of creating a new metricset based on this helper, to the point you only need [this](https://github.com/exekias/beats/blob/5f4568208ac273d9b759343b22f783184951a477/metricbeat/module/kubernetes/apiserver/apiserver.go) to create it and [this](https://github.com/exekias/beats/blob/5f4568208ac273d9b759343b22f783184951a477/metricbeat/module/kubernetes/apiserver/apiserver_test.go) to test it.
  • Loading branch information
exekias authored and ruflin committed May 17, 2018
1 parent a1def32 commit b1f1b5f
Show file tree
Hide file tree
Showing 20 changed files with 20,375 additions and 9 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ https://github.com/elastic/beats/compare/v6.2.3...master[Check the HEAD diff]
- Add Elasticsearch index_summary metricset. {pull}6918[6918]
- Add config option `management_path_prefix` for RabbitMQ module to configure management plugin path prefix {issue}6875[6875] {pull}7074[7074]
- Add shard metricset to Elasticsearch module. {pull}7006[7006]
- Add apiserver metricset to Kubernetes module. {pull}7059[7059]

*Packetbeat*

Expand Down
6 changes: 3 additions & 3 deletions libbeat/template/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,12 +219,12 @@ func (p *Processor) object(f *common.Field) common.MapStr {
dynProperties["index"] = "analyzed"
}
addDynamicTemplate(f, dynProperties, matchType("string"))
case "long":
dynProperties["type"] = f.ObjectType
addDynamicTemplate(f, dynProperties, matchType("long"))
case "keyword":
dynProperties["type"] = f.ObjectType
addDynamicTemplate(f, dynProperties, matchType("string"))
case "long", "double":
dynProperties["type"] = f.ObjectType
addDynamicTemplate(f, dynProperties, matchType(f.ObjectType))
}

properties := getDefaultProperties(f)
Expand Down
6 changes: 6 additions & 0 deletions libbeat/template/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,12 @@ func TestProcessor(t *testing.T) {
"type": "long", "doc_values": false,
},
},
{
output: p.other(&common.Field{Type: "double", DocValues: &falseVar}),
expected: common.MapStr{
"type": "double", "doc_values": false,
},
},
{
output: p.other(&common.Field{Type: "text", DocValues: &trueVar}),
expected: common.MapStr{
Expand Down
97 changes: 97 additions & 0 deletions metricbeat/docs/fields.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -6564,6 +6564,103 @@ Information and statistics of pods managed by kubernetes.
[float]
== apiserver fields
Kubernetes API server metrics
*`kubernetes.apiserver.request.client`*::
+
--
type: keyword
Client doing the requests
--
*`kubernetes.apiserver.request.resource`*::
+
--
type: keyword
Requested resource
--
*`kubernetes.apiserver.request.subresource`*::
+
--
type: keyword
Requested subresource
--
*`kubernetes.apiserver.request.scope`*::
+
--
type: keyword
Request scope (cluster, namespace, resource)
--
*`kubernetes.apiserver.request.verb`*::
+
--
type: keyword
Request HTTP verb
--
*`kubernetes.apiserver.request.count`*::
+
--
type: long
Total number of requests
--
*`kubernetes.apiserver.request.latency.sum`*::
+
--
type: long
Requests latency, sum of latencies in microseconds
--
*`kubernetes.apiserver.request.latency.count`*::
+
--
type: long
Request latency, number of requests
--
*`kubernetes.apiserver.request.latency.bucket`*::
+
--
type: object
Request latency histagram buckets
--
[float]
== container fields
Expand Down
7 changes: 7 additions & 0 deletions metricbeat/docs/modules/kubernetes.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,13 @@ metricbeat.modules:
enabled: true
metricsets:
- event
# Kubernetes API server
- module: kubernetes
enabled: true
metricsets:
- apiserver
hosts: ["https://${KUBERNETES_SERVICE_HOST}:${KUBERNETES_SERVICE_PORT}"]
----

This module supports TLS connection when using `ssl` config field, as described in <<configuration-ssl>>.
Expand Down
47 changes: 47 additions & 0 deletions metricbeat/helper/prometheus/metric.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
package prometheus

import (
"math"
"strconv"
"strings"

"github.com/elastic/beats/libbeat/common"

dto "github.com/prometheus/client_model/go"
)

Expand Down Expand Up @@ -74,6 +78,49 @@ func (m *commonMetric) GetValue(metric *dto.Metric) interface{} {
return gauge.GetValue()
}

summary := metric.GetSummary()
if summary != nil {
value := common.MapStr{}
value["sum"] = summary.GetSampleSum()
value["count"] = summary.GetSampleCount()

quantiles := summary.GetQuantile()
percentileMap := common.MapStr{}
for _, quantile := range quantiles {
if !math.IsNaN(quantile.GetValue()) {
key := strconv.FormatFloat((100 * quantile.GetQuantile()), 'f', -1, 64)
percentileMap[key] = quantile.GetValue()
}

}

if len(percentileMap) != 0 {
value["percentile"] = percentileMap
}

return value
}

histogram := metric.GetHistogram()
if histogram != nil {
value := common.MapStr{}
value["sum"] = histogram.GetSampleSum()
value["count"] = histogram.GetSampleCount()

buckets := histogram.GetBucket()
bucketMap := common.MapStr{}
for _, bucket := range buckets {
key := strconv.FormatFloat(bucket.GetUpperBound(), 'f', -1, 64)
bucketMap[key] = bucket.GetCumulativeCount()
}

if len(bucketMap) != 0 {
value["bucket"] = bucketMap
}

return value
}

// Other types are not supported here
return nil
}
Expand Down
44 changes: 44 additions & 0 deletions metricbeat/helper/prometheus/module.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package prometheus

import (
"github.com/elastic/beats/metricbeat/mb"
"github.com/elastic/beats/metricbeat/mb/parse"
)

const (
defaultScheme = "http"
defaultPath = "/metrics"
)

var (
// HostParser validates Prometheus URLs
HostParser = parse.URLHostParserBuilder{
DefaultScheme: defaultScheme,
DefaultPath: defaultPath,
}.Build()
)

// MetricSetBuilder returns a builder function for a new Prometheus metricset using the given mapping
func MetricSetBuilder(mapping *MetricsMapping) func(base mb.BaseMetricSet) (mb.MetricSet, error) {
return func(base mb.BaseMetricSet) (mb.MetricSet, error) {
prometheus, err := NewPrometheusClient(base)
if err != nil {
return nil, err
}
return &prometheusMetricSet{
BaseMetricSet: base,
prometheus: prometheus,
mapping: mapping,
}, nil
}
}

type prometheusMetricSet struct {
mb.BaseMetricSet
prometheus Prometheus
mapping *MetricsMapping
}

func (m *prometheusMetricSet) Fetch(r mb.ReporterV2) {
m.prometheus.ReportProcessedMetrics(m.mapping, r)
}
16 changes: 15 additions & 1 deletion metricbeat/helper/prometheus/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ type Prometheus interface {
GetFamilies() ([]*dto.MetricFamily, error)

GetProcessedMetrics(mapping *MetricsMapping) ([]common.MapStr, error)

ReportProcessedMetrics(mapping *MetricsMapping, r mb.ReporterV2)
}

type prometheus struct {
Expand Down Expand Up @@ -138,10 +140,22 @@ func (p *prometheus) GetProcessedMetrics(mapping *MetricsMapping) ([]common.MapS
for k, v := range mapping.ExtraFields {
event[k] = v
}

events = append(events, event)

}
return events, nil

}

func (p *prometheus) ReportProcessedMetrics(mapping *MetricsMapping, r mb.ReporterV2) {
events, err := p.GetProcessedMetrics(mapping)
if err != nil {
r.Error(err)
return
}
for _, event := range events {
r.Event(mb.Event{MetricSetFields: event})
}
}

func getEvent(m map[string]common.MapStr, labels common.MapStr) common.MapStr {
Expand Down
76 changes: 72 additions & 4 deletions metricbeat/helper/prometheus/prometheus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,31 @@ import (
"github.com/stretchr/testify/assert"

"github.com/elastic/beats/libbeat/common"
mbtest "github.com/elastic/beats/metricbeat/mb/testing"
)

const promMetrics = `
# TYPE first_metric gauge
first_metric{label1="value1",label2="value2",label3="value3"} 1
# TYPE second_metric gauge
second_metric{label1="value1",label3="othervalue"} 0
# TYPE summary_metric summary
summary_metric{quantile="0.5"} 29735
summary_metric{quantile="0.9"} 47103
summary_metric{quantile="0.99"} 50681
summary_metric_sum 234892394
summary_metric_count 44000
# TYPE histogram_metric histogram
histogram_metric_bucket{le="1000"} 1
histogram_metric_bucket{le="10000"} 1
histogram_metric_bucket{le="100000"} 1
histogram_metric_bucket{le="1e+06"} 1
histogram_metric_bucket{le="1e+08"} 1
histogram_metric_bucket{le="1e+09"} 1
histogram_metric_bucket{le="+Inf"} 1
histogram_metric_sum 117
histogram_metric_count 1
`

type mockFetcher struct{}
Expand Down Expand Up @@ -175,15 +193,65 @@ func TestPrometheus(t *testing.T) {
},
},
},
{
msg: "Summary metric",
mapping: &MetricsMapping{
Metrics: map[string]MetricMap{
"summary_metric": Metric("summary.metric"),
},
},
expected: []common.MapStr{
common.MapStr{
"summary.metric": common.MapStr{
"sum": 234892394.0,
"count": uint64(44000),
"percentile": common.MapStr{
"50": 29735.0,
"90": 47103.0,
"99": 50681.0,
},
},
},
},
},
{
msg: "Histogram metric",
mapping: &MetricsMapping{
Metrics: map[string]MetricMap{
"histogram_metric": Metric("histogram.metric"),
},
},
expected: []common.MapStr{
common.MapStr{
"histogram.metric": common.MapStr{
"count": uint64(1),
"bucket": common.MapStr{
"1000000000": uint64(1),
"+Inf": uint64(1),
"1000": uint64(1),
"10000": uint64(1),
"100000": uint64(1),
"1000000": uint64(1),
"100000000": uint64(1),
},
"sum": 117.0,
},
},
},
},
}

for _, test := range tests {
res, err := p.GetProcessedMetrics(test.mapping)
assert.Nil(t, err, test.msg)
reporter := &mbtest.CapturingReporterV2{}
p.ReportProcessedMetrics(test.mapping, reporter)
assert.Nil(t, reporter.GetErrors(), test.msg)
// Sort slice to avoid randomness
res := reporter.GetEvents()
sort.Slice(res, func(i, j int) bool {
return res[i].String() < res[j].String()
return res[i].MetricSetFields.String() < res[j].MetricSetFields.String()
})
assert.Equal(t, test.expected, res, test.msg)
for j, ev := range res {
assert.Equal(t, test.expected[j], ev.MetricSetFields, test.msg)
}
}
}
Loading

0 comments on commit b1f1b5f

Please sign in to comment.