Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added new basicstats aggregator #2167

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions plugins/aggregators/all/all.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package all

import (
_ "github.com/influxdata/telegraf/plugins/aggregators/basicstats"
_ "github.com/influxdata/telegraf/plugins/aggregators/histogram"
_ "github.com/influxdata/telegraf/plugins/aggregators/minmax"
)
43 changes: 43 additions & 0 deletions plugins/aggregators/basicstats/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# BasicStats Aggregator Plugin

The BasicStats aggregator plugin give us count,max,min,mean,s2(variance), stdev for a set of values,
emitting the aggregate every `period` seconds.

### Configuration:

```toml
# Keep the aggregate basicstats of each metric passing through.
[[aggregators.basicstats]]
## General Aggregator Arguments:
## The period on which to flush & clear the aggregator.
period = "30s"
## If true, the original metric will be dropped by the
## aggregator and will not get sent to the output plugins.
drop_original = false
```

### Measurements & Fields:

- measurement1
- field1_count
- field1_max
- field1_min
- field1_mean
- field1_s2 (variance)
- field1_stdev (standar deviation)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo in standard


### Tags:

No tags are applied by this aggregator.

### Example Output:

```
$ telegraf --config telegraf.conf --quiet
system,host=tars load1=1 1475583980000000000
system,host=tars load1=1 1475583990000000000
system,host=tars load1_count=2,load1_max=1,load1_min=1,load1_mean=1,load1_s2=0,load1_stdev=0 1475584010000000000
system,host=tars load1=1 1475584020000000000
system,host=tars load1=3 1475584030000000000
system,host=tars load1_count=2,load1_max=3,load1_min=1,load1_mean=2,load1_s2=2,load1_stdev=1.414162 1475584010000000000
```
156 changes: 156 additions & 0 deletions plugins/aggregators/basicstats/basicstats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
package basicstats

import (
"math"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/aggregators"
)

type BasicStats struct {
cache map[uint64]aggregate
}

func NewBasicStats() telegraf.Aggregator {
mm := &BasicStats{}
mm.Reset()
return mm
}

type aggregate struct {
fields map[string]basicstats
name string
tags map[string]string
}

type basicstats struct {
count float64
min float64
max float64
mean float64
M2 float64 //intermemedia value for variance/stdev
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo

//stdev float64
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove

}

var sampleConfig = `
## General Aggregator Arguments:
## The period on which to flush & clear the aggregator.
period = "30s"
## If true, the original metric will be dropped by the
## aggregator and will not get sent to the output plugins.
drop_original = false
`

func (m *BasicStats) SampleConfig() string {
return sampleConfig
}

func (m *BasicStats) Description() string {
return "Keep the aggregate basicstats of each metric passing through."
}

func (m *BasicStats) Add(in telegraf.Metric) {
id := in.HashID()
if _, ok := m.cache[id]; !ok {
// hit an uncached metric, create caches for first time:
a := aggregate{
name: in.Name(),
tags: in.Tags(),
fields: make(map[string]basicstats),
}
for k, v := range in.Fields() {
if fv, ok := convert(v); ok {
a.fields[k] = basicstats{
count: 1,
min: fv,
max: fv,
mean: fv,
M2: 0.0,
}
}
}
m.cache[id] = a
} else {
for k, v := range in.Fields() {
if fv, ok := convert(v); ok {
if _, ok := m.cache[id].fields[k]; !ok {
// hit an uncached field of a cached metric
m.cache[id].fields[k] = basicstats{
count: 1,
min: fv,
max: fv,
mean: fv,
M2: 0.0,
}
continue
}

tmp := m.cache[id].fields[k]
//https://en.m.wikipedia.org/wiki/Algorithms_for_calculating_variance
//variable initialization
x := fv
mean := tmp.mean
M2 := tmp.M2
//counter compute
n := tmp.count + 1
tmp.count = n
//mean compute
delta := x - mean
mean = mean + delta/n
tmp.mean = mean
//variance/stdev compute
M2 = M2 + delta*(x-mean)
tmp.M2 = M2
//max/min compute
if fv < tmp.min {
tmp.min = fv
} else if fv > tmp.max {
tmp.max = fv
}
//store final data
m.cache[id].fields[k] = tmp
}
}
}
}

func (m *BasicStats) Push(acc telegraf.Accumulator) {
for _, aggregate := range m.cache {
fields := map[string]interface{}{}
for k, v := range aggregate.fields {
fields[k+"_count"] = v.count
fields[k+"_min"] = v.min
fields[k+"_max"] = v.max
fields[k+"_mean"] = v.mean
//v.count always >=1
if v.count > 1 {
variance := v.M2 / (v.count - 1)
fields[k+"_s2"] = variance
fields[k+"_stdev"] = math.Sqrt(variance)
}
//if count == 1 StdDev = infinite => so I won't send data
}
acc.AddFields(aggregate.name, fields, aggregate.tags)
}
}

func (m *BasicStats) Reset() {
m.cache = make(map[uint64]aggregate)
}

func convert(in interface{}) (float64, bool) {
switch v := in.(type) {
case float64:
return v, true
case int64:
return float64(v), true
default:
return 0, false
}
}

func init() {
aggregators.Add("basicstats", func() telegraf.Aggregator {
return NewBasicStats()
})
}
151 changes: 151 additions & 0 deletions plugins/aggregators/basicstats/basicstats_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
package basicstats

import (
"math"
"testing"
"time"

"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/testutil"
)

var m1, _ = metric.New("m1",
map[string]string{"foo": "bar"},
map[string]interface{}{
"a": int64(1),
"b": int64(1),
"c": float64(2),
"d": float64(2),
},
time.Now(),
)
var m2, _ = metric.New("m1",
map[string]string{"foo": "bar"},
map[string]interface{}{
"a": int64(1),
"b": int64(3),
"c": float64(4),
"d": float64(6),
"e": float64(200),
"ignoreme": "string",
"andme": true,
},
time.Now(),
)

func BenchmarkApply(b *testing.B) {
minmax := NewBasicStats()

for n := 0; n < b.N; n++ {
minmax.Add(m1)
minmax.Add(m2)
}
}

// Test two metrics getting added.
func TestBasicStatsWithPeriod(t *testing.T) {
acc := testutil.Accumulator{}
minmax := NewBasicStats()

minmax.Add(m1)
minmax.Add(m2)
minmax.Push(&acc)

expectedFields := map[string]interface{}{
"a_count": float64(2), //a
"a_max": float64(1),
"a_min": float64(1),
"a_mean": float64(1),
"a_stdev": float64(0),
"a_s2": float64(0),
"b_count": float64(2), //b
"b_max": float64(3),
"b_min": float64(1),
"b_mean": float64(2),
"b_s2": float64(2),
"b_stdev": math.Sqrt(2),
"c_count": float64(2), //c
"c_max": float64(4),
"c_min": float64(2),
"c_mean": float64(3),
"c_s2": float64(2),
"c_stdev": math.Sqrt(2),
"d_count": float64(2), //d
"d_max": float64(6),
"d_min": float64(2),
"d_mean": float64(4),
"d_s2": float64(8),
"d_stdev": math.Sqrt(8),
"e_count": float64(1), //e
"e_max": float64(200),
"e_min": float64(200),
"e_mean": float64(200),
}
expectedTags := map[string]string{
"foo": "bar",
}
acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags)
}

// Test two metrics getting added with a push/reset in between (simulates
// getting added in different periods.)
func TestBasicStatsDifferentPeriods(t *testing.T) {
acc := testutil.Accumulator{}
minmax := NewBasicStats()

minmax.Add(m1)
minmax.Push(&acc)
expectedFields := map[string]interface{}{
"a_count": float64(1), //a
"a_max": float64(1),
"a_min": float64(1),
"a_mean": float64(1),
"b_count": float64(1), //b
"b_max": float64(1),
"b_min": float64(1),
"b_mean": float64(1),
"c_count": float64(1), //c
"c_max": float64(2),
"c_min": float64(2),
"c_mean": float64(2),
"d_count": float64(1), //d
"d_max": float64(2),
"d_min": float64(2),
"d_mean": float64(2),
}
expectedTags := map[string]string{
"foo": "bar",
}
acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags)

acc.ClearMetrics()
minmax.Reset()
minmax.Add(m2)
minmax.Push(&acc)
expectedFields = map[string]interface{}{
"a_count": float64(1), //a
"a_max": float64(1),
"a_min": float64(1),
"a_mean": float64(1),
"b_count": float64(1), //b
"b_max": float64(3),
"b_min": float64(3),
"b_mean": float64(3),
"c_count": float64(1), //c
"c_max": float64(4),
"c_min": float64(4),
"c_mean": float64(4),
"d_count": float64(1), //d
"d_max": float64(6),
"d_min": float64(6),
"d_mean": float64(6),
"e_count": float64(1), //e
"e_max": float64(200),
"e_min": float64(200),
"e_mean": float64(200),
}
expectedTags = map[string]string{
"foo": "bar",
}
acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags)
}