Skip to content

Commit

Permalink
Update state_node metricset (#7699)
Browse files Browse the repository at this point in the history
* Add options mechanism to Prometheus metrics

* Move lowercase settings to MetricOption scheme

* Apply options to all labels

* Fix `state_node` for latest `kube-state-metrics` versions

* Update CHANGELOG.asciidoc

* Add missing comments
  • Loading branch information
exekias authored and tsg committed Jul 24, 2018
1 parent 1d4eb23 commit 6a25c09
Show file tree
Hide file tree
Showing 10 changed files with 240 additions and 141 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ https://github.com/elastic/beats/compare/v6.2.3...master[Check the HEAD diff]
- Do not report Metricbeat container host as hostname in Kubernetes deployment. {issue}7199[7199]
- Ensure metadata updates don't replace existing pod metrics. {pull}7573[7573]
- Fix kubernetes pct fields reporting. {pull}7677[7677]
- Add support for new `kube_node_status_condition` in Kubernetes `state_node`. {pull}7699[7699]

*Packetbeat*

Expand Down
95 changes: 75 additions & 20 deletions metricbeat/helper/prometheus/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,60 +29,94 @@ import (

// MetricMap defines the mapping from Prometheus metric to a Metricbeat field
type MetricMap interface {
// GetOptions returns the list of metric options
GetOptions() []MetricOption

// GetField returns the resulting field name
GetField() string

// GetValue returns the resulting value
GetValue(m *dto.Metric) interface{}
}

// MetricOption adds settings to Metric objects behavior
type MetricOption interface {
// Process a tuple of field, value and labels from a metric, return the same tuple updated
Process(field string, value interface{}, labels common.MapStr) (string, interface{}, common.MapStr)
}

// OpFilter only processes metrics matching the given filter
func OpFilter(filter map[string]string) MetricOption {
return opFilter{
labels: filter,
}
}

// OpLowercaseValue lowercases the value if it's a string
func OpLowercaseValue() MetricOption {
return opLowercaseValue{}
}

// Metric directly maps a Prometheus metric to a Metricbeat field
func Metric(field string) MetricMap {
func Metric(field string, options ...MetricOption) MetricMap {
return &commonMetric{
field: field,
field: field,
options: options,
}
}

// KeywordMetric maps a Prometheus metric to a Metricbeat field, stores the
// given keyword when source metric value is 1
func KeywordMetric(field, keyword string) MetricMap {
func KeywordMetric(field, keyword string, options ...MetricOption) MetricMap {
return &keywordMetric{
commonMetric{
field: field,
field: field,
options: options,
},
keyword,
}
}

// BooleanMetric maps a Prometheus metric to a Metricbeat field of bool type
func BooleanMetric(field string) MetricMap {
func BooleanMetric(field string, options ...MetricOption) MetricMap {
return &booleanMetric{
commonMetric{
field: field,
field: field,
options: options,
},
}
}

// LabelMetric maps a Prometheus metric to a Metricbeat field, stores the value
// of a given label on it if the gauge value is 1
func LabelMetric(field, label string, lowercase bool) MetricMap {
func LabelMetric(field, label string, options ...MetricOption) MetricMap {
return &labelMetric{
commonMetric{
field: field,
field: field,
options: options,
},
label,
lowercase,
}
}

// InfoMetric obtains info labels from the given metric and puts them
// into events matching all the key labels present in the metric
func InfoMetric() MetricMap {
return &infoMetric{}
func InfoMetric(options ...MetricOption) MetricMap {
return &infoMetric{
commonMetric{
options: options,
},
}
}

type commonMetric struct {
field string
field string
options []MetricOption
}

// GetOptions returns the list of metric options
func (m *commonMetric) GetOptions() []MetricOption {
return m.options
}

// GetField returns the resulting field name
Expand Down Expand Up @@ -176,18 +210,13 @@ func (m *booleanMetric) GetValue(metric *dto.Metric) interface{} {

type labelMetric struct {
commonMetric
label string
lowercase bool
label string
}

// GetValue returns the resulting value
func (m *labelMetric) GetValue(metric *dto.Metric) interface{} {
if gauge := metric.GetGauge(); gauge != nil && gauge.GetValue() == 1 {
value := getLabel(metric, m.label)
if m.lowercase {
return strings.ToLower(value)
}
return value
return getLabel(metric, m.label)
}
return nil
}
Expand All @@ -201,7 +230,9 @@ func getLabel(metric *dto.Metric, name string) string {
return ""
}

type infoMetric struct{}
type infoMetric struct {
commonMetric
}

// GetValue returns the resulting value
func (m *infoMetric) GetValue(metric *dto.Metric) interface{} {
Expand All @@ -212,3 +243,27 @@ func (m *infoMetric) GetValue(metric *dto.Metric) interface{} {
func (m *infoMetric) GetField() string {
return ""
}

type opFilter struct {
labels map[string]string
}

// Process will return nil if labels don't match the filter
func (o opFilter) Process(field string, value interface{}, labels common.MapStr) (string, interface{}, common.MapStr) {
for k, v := range o.labels {
if labels[k] != v {
return "", nil, nil
}
}
return field, value, labels
}

type opLowercaseValue struct{}

// Process will lowercase the given value if it's a string
func (o opLowercaseValue) Process(field string, value interface{}, labels common.MapStr) (string, interface{}, common.MapStr) {
if val, ok := value.(string); ok {
value = strings.ToLower(val)
}
return field, value, labels
}
18 changes: 13 additions & 5 deletions metricbeat/helper/prometheus/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,16 @@ func (p *prometheus) GetProcessedMetrics(mapping *MetricsMapping) ([]common.MapS
continue
}

// Apply extra options
allLabels := getLabels(metric)
for _, option := range m.GetOptions() {
field, value, allLabels = option.Process(field, value, allLabels)
}

// Convert labels
labels := common.MapStr{}
keyLabels := common.MapStr{}
for k, v := range getLabels(metric) {
for k, v := range allLabels {
if l, ok := mapping.Labels[k]; ok {
if l.IsKey() {
keyLabels.Put(l.GetField(), v)
Expand All @@ -151,10 +157,12 @@ func (p *prometheus) GetProcessedMetrics(mapping *MetricsMapping) ([]common.MapS
continue
}

// Put it in the event if it's a common metric
event := getEvent(eventsMap, keyLabels)
event.Put(field, value)
event.DeepUpdate(labels)
if field != "" {
// Put it in the event if it's a common metric
event := getEvent(eventsMap, keyLabels)
event.Put(field, value)
event.DeepUpdate(labels)
}
}
}

Expand Down
41 changes: 29 additions & 12 deletions metricbeat/helper/prometheus/prometheus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ func TestPrometheus(t *testing.T) {
msg: "Label metrics",
mapping: &MetricsMapping{
Metrics: map[string]MetricMap{
"first_metric": LabelMetric("first.metric", "label3", false),
"first_metric": LabelMetric("first.metric", "label3"),
},
Labels: map[string]LabelMap{
"label1": Label("labels.label1"),
Expand All @@ -248,7 +248,7 @@ func TestPrometheus(t *testing.T) {
msg: "Label metrics, lowercase",
mapping: &MetricsMapping{
Metrics: map[string]MetricMap{
"first_metric": LabelMetric("first.metric", "label4", true),
"first_metric": LabelMetric("first.metric", "label4", OpLowercaseValue()),
},
Labels: map[string]LabelMap{
"label1": Label("labels.label1"),
Expand All @@ -265,6 +265,20 @@ func TestPrometheus(t *testing.T) {
},
},
},
{
msg: "Label metrics, filter",
mapping: &MetricsMapping{
Metrics: map[string]MetricMap{
"first_metric": LabelMetric("first.metric", "label4", OpLowercaseValue(), OpFilter(map[string]string{
"foo": "filtered",
})),
},
Labels: map[string]LabelMap{
"label1": Label("labels.label1"),
},
},
expected: []common.MapStr{},
},
{
msg: "Summary metric",
mapping: &MetricsMapping{
Expand Down Expand Up @@ -318,16 +332,19 @@ func TestPrometheus(t *testing.T) {
}

for _, test := range tests {
reporter := &mbtest.CapturingReporterV2{}
p.ReportProcessedMetrics(test.mapping, reporter)
assert.Nil(t, reporter.GetErrors(), test.msg)
// Sort slice to avoid randomness
res := reporter.GetEvents()
sort.Slice(res, func(i, j int) bool {
return res[i].MetricSetFields.String() < res[j].MetricSetFields.String()
t.Run(test.msg, func(t *testing.T) {
reporter := &mbtest.CapturingReporterV2{}
p.ReportProcessedMetrics(test.mapping, reporter)
assert.Nil(t, reporter.GetErrors(), test.msg)
// Sort slice to avoid randomness
res := reporter.GetEvents()
sort.Slice(res, func(i, j int) bool {
return res[i].MetricSetFields.String() < res[j].MetricSetFields.String()
})
assert.Equal(t, len(test.expected), len(res))
for j, ev := range res {
assert.Equal(t, test.expected[j], ev.MetricSetFields, test.msg)
}
})
for j, ev := range res {
assert.Equal(t, test.expected[j], ev.MetricSetFields, test.msg)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ var (
"kube_pod_container_status_running": p.KeywordMetric("status.phase", "running"),
"kube_pod_container_status_terminated": p.KeywordMetric("status.phase", "terminated"),
"kube_pod_container_status_waiting": p.KeywordMetric("status.phase", "waiting"),
"kube_pod_container_status_terminated_reason": p.LabelMetric("status.reason", "reason", false),
"kube_pod_container_status_waiting_reason": p.LabelMetric("status.reason", "reason", false),
"kube_pod_container_status_terminated_reason": p.LabelMetric("status.reason", "reason"),
"kube_pod_container_status_waiting_reason": p.LabelMetric("status.reason", "reason"),
},

Labels: map[string]p.LabelMap{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
[
{
"_namespace": "node",
"cpu": {
"allocatable": {
"cores": 3
},
"capacity": {
"cores": 4
}
},
"memory": {
"allocatable": {
"bytes": 3097786880
},
"capacity": {
"bytes": 4097786880
}
},
"name": "minikube-test",
"pod": {
"allocatable": {
"total": 210
},
"capacity": {
"total": 310
}
},
"status": {
"ready": "true",
"unschedulable": true
}
},
{
"_namespace": "node",
"cpu": {
"allocatable": {
"cores": 2
},
"capacity": {
"cores": 2
}
},
"memory": {
"allocatable": {
"bytes": 2097786880
},
"capacity": {
"bytes": 2097786880
}
},
"name": "minikube",
"pod": {
"allocatable": {
"total": 110
},
"capacity": {
"total": 110
}
},
"status": {
"ready": "true",
"unschedulable": false
}
}
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
[
{
"_namespace": "node",
"cpu": {
"allocatable": {
"cores": 2
},
"capacity": {
"cores": 2
}
},
"memory": {
"allocatable": {
"bytes": 1992347648
},
"capacity": {
"bytes": 2097205248
}
},
"name": "minikube",
"pod": {
"allocatable": {
"total": 110
},
"capacity": {
"total": 110
}
},
"status": {
"ready": "true",
"unschedulable": false
}
}
]
Loading

0 comments on commit 6a25c09

Please sign in to comment.