Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow to store rates instead of cumulative counters when using Prometheus #15141

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions metricbeat/docs/modules/prometheus.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ metricbeat.modules:
period: 10s
hosts: ["localhost:9090"]
metrics_path: /metrics

# Store counter rates instead of original cumulative counters (Default: false)
#rate_counters: true

#username: "user"
#password: "secret"

Expand Down
105 changes: 105 additions & 0 deletions metricbeat/helper/prometheus/counter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package prometheus

import (
"time"

"github.com/elastic/beats/libbeat/common"
)

// CounterCache keeps a cache of the last value of all given counters
// and allows to calculate their rate since the last call.
// All methods are thread-unsafe and must not be called concurrently
type CounterCache interface {
// Start the cache cleanup worker. It mus be called once before start using
// the cache
Start()

// Stop the cache cleanup worker. It mus be called when the cache is disposed
Stop()

// RateUUint64 returns, for a given counter name, the difference between the given value
// and the value that was given in a previous call. It will return 0 on the first call
RateUint64(counterName string, value uint64) uint64

// RateFloat64 returns, for a given counter name, the difference between the given value
// and the value that was given in a previous call. It will return 0.0 on the first call
RateFloat64(counterName string, value float64) float64
}

type counterCache struct {
ints *common.Cache
floats *common.Cache
timeout time.Duration
}

// NewCounterCache initializes and returns a CounterCache. The timeout parameter will be
// used to automatically expire counters that hasn't been updated in a whole timeout period
func NewCounterCache(timeout time.Duration) CounterCache {
return &counterCache{
ints: common.NewCache(timeout, 0),
floats: common.NewCache(timeout, 0),
timeout: timeout,
}
}

// RateUint64 returns, for a given counter name, the difference between the given value
// and the value that was given in a previous call. It will return 0 on the first call
func (c *counterCache) RateUint64(counterName string, value uint64) uint64 {
prev := c.ints.PutWithTimeout(counterName, value, c.timeout)
if prev != nil {
if prev.(uint64) > value {
// counter reset
return 0
}
return value - prev.(uint64)
}

// first put for this value, return rate of 0
return 0
}

// RateFloat64 returns, for a given counter name, the difference between the given value
// and the value that was given in a previous call. It will return 0.0 on the first call
func (c *counterCache) RateFloat64(counterName string, value float64) float64 {
prev := c.floats.PutWithTimeout(counterName, value, c.timeout)
if prev != nil {
if prev.(float64) > value {
// counter reset
return 0
}
return value - prev.(float64)
}

// first put for this value, return rate of 0
return 0
}

// Start the cache cleanup worker. It mus be called once before start using
// the cache
func (c *counterCache) Start() {
c.ints.StartJanitor(c.timeout)
c.floats.StartJanitor(c.timeout)
}

// Stop the cache cleanup worker. It mus be called when the cache is disposed
func (c *counterCache) Stop() {
c.ints.StopJanitor()
c.floats.StopJanitor()
}
78 changes: 78 additions & 0 deletions metricbeat/helper/prometheus/counter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package prometheus

import (
"testing"
"time"

"github.com/elastic/beats/libbeat/common"
)

func Test_CounterCache(t *testing.T) {
type fields struct {
ints *common.Cache
floats *common.Cache
timeout time.Duration
}

tests := []struct {
name string
counterCache CounterCache
counterName string
valuesUint64 []uint64
expectedUin64 []uint64
valuesFloat64 []float64
expectedFloat64 []float64
}{
{
name: "rates are calculated",
counterCache: NewCounterCache(1 * time.Second),
counterName: "test_counter",
valuesUint64: []uint64{10, 14, 17, 17, 28},
expectedUin64: []uint64{0, 4, 3, 0, 11},
valuesFloat64: []float64{1.0, 101.0, 102.0, 102.0, 1034.0},
expectedFloat64: []float64{0.0, 100.0, 1.0, 0.0, 932.0},
},
{
name: "counter reset",
counterCache: NewCounterCache(1 * time.Second),
counterName: "test_counter",
valuesUint64: []uint64{10, 14, 17, 1, 3},
expectedUin64: []uint64{0, 4, 3, 0, 2},
valuesFloat64: []float64{1.0, 101.0, 2.0, 13.0},
expectedFloat64: []float64{0.0, 100.0, 0.0, 11.0},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
for i, val := range tt.valuesUint64 {
want := tt.expectedUin64[i]
if got := tt.counterCache.RateUint64(tt.counterName, val); got != want {
t.Errorf("counterCache.RateUint64() = %v, want %v", got, want)
}
}
for i, val := range tt.valuesFloat64 {
want := tt.expectedFloat64[i]
if got := tt.counterCache.RateFloat64(tt.counterName, val); got != want {
t.Errorf("counterCache.RateFloat64() = %v, want %v", got, want)
}
}
})
}
}
4 changes: 2 additions & 2 deletions metricbeat/helper/prometheus/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,9 @@ func (p *prometheus) GetFamilies() ([]*dto.MetricFamily, error) {
break
}
return nil, errors.Wrap(err, "decoding of metric family failed")
} else {
families = append(families, mf)
}

families = append(families, mf)
}

return families, nil
Expand Down
4 changes: 4 additions & 0 deletions metricbeat/metricbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -691,6 +691,10 @@ metricbeat.modules:
period: 10s
hosts: ["localhost:9090"]
metrics_path: /metrics

# Store counter rates instead of original cumulative counters (Default: false)
#rate_counters: true

#username: "user"
#password: "secret"

Expand Down
4 changes: 4 additions & 0 deletions metricbeat/module/prometheus/_meta/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@
period: 10s
hosts: ["localhost:9090"]
metrics_path: /metrics

# Store counter rates instead of original cumulative counters (Default: false)
#rate_counters: true

#username: "user"
#password: "secret"

Expand Down
31 changes: 28 additions & 3 deletions metricbeat/module/prometheus/collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,19 +50,36 @@ func init() {
type MetricSet struct {
mb.BaseMetricSet
prometheus p.Prometheus
counters p.CounterCache
config Config
}

// New creates a new metricset
func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
config := defaultConfig()
if err := base.Module().UnpackConfig(&config); err != nil {
return nil, err
}

prometheus, err := p.NewPrometheusClient(base)
if err != nil {
return nil, err
}

return &MetricSet{
metricset := MetricSet{
BaseMetricSet: base,
prometheus: prometheus,
}, nil
}

// Calculate rate on counters?
if config.RateCounters {
// use a counter cache with a timeout of 5x the period, as a safe value
// to make sure that all counters are available between fetches
metricset.counters = p.NewCounterCache(base.Module().Config().Period * 5)
metricset.counters.Start()
}

return &metricset, nil
}

// Fetch fetches data and reports it
Expand All @@ -76,7 +93,7 @@ func (m *MetricSet) Fetch(reporter mb.ReporterV2) error {
eventList := map[string]common.MapStr{}

for _, family := range families {
promEvents := getPromEventsFromMetricFamily(family)
promEvents := getPromEventsFromMetricFamily(family, m.counters)

for _, promEvent := range promEvents {
labelsHash := promEvent.LabelsHash()
Expand Down Expand Up @@ -117,3 +134,11 @@ func (m *MetricSet) Fetch(reporter mb.ReporterV2) error {

return nil
}

// Close stops the metricset
func (m *MetricSet) Close() error {
if m.counters != nil {
m.counters.Stop()
}
return nil
}
Loading