-
Notifications
You must be signed in to change notification settings - Fork 5.6k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
added new basicstats aggregator for max,min,count,mean,stdev stats
- Loading branch information
1 parent
0f452ad
commit 0ce8426
Showing
4 changed files
with
351 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,7 @@ | ||
package all | ||
|
||
import ( | ||
_ "github.com/influxdata/telegraf/plugins/aggregators/basicstats" | ||
_ "github.com/influxdata/telegraf/plugins/aggregators/histogram" | ||
_ "github.com/influxdata/telegraf/plugins/aggregators/minmax" | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
# BasicStats Aggregator Plugin | ||
|
||
The BasicStats aggregator plugin give us count,max,min,mean,s2(variance), stdev for a set of values, | ||
emitting the aggregate every `period` seconds. | ||
|
||
### Configuration: | ||
|
||
```toml | ||
# Keep the aggregate basicstats of each metric passing through. | ||
[[aggregators.basicstats]] | ||
## General Aggregator Arguments: | ||
## The period on which to flush & clear the aggregator. | ||
period = "30s" | ||
## If true, the original metric will be dropped by the | ||
## aggregator and will not get sent to the output plugins. | ||
drop_original = false | ||
``` | ||
|
||
### Measurements & Fields: | ||
|
||
- measurement1 | ||
- field1_count | ||
- field1_max | ||
- field1_min | ||
- field1_mean | ||
- field1_s2 (variance) | ||
- field1_stdev (standar deviation) | ||
|
||
### Tags: | ||
|
||
No tags are applied by this aggregator. | ||
|
||
### Example Output: | ||
|
||
``` | ||
$ telegraf --config telegraf.conf --quiet | ||
system,host=tars load1=1 1475583980000000000 | ||
system,host=tars load1=1 1475583990000000000 | ||
system,host=tars load1_count=2,load1_max=1,load1_min=1,load1_mean=1,load1_s2=0,load1_stdev=0 1475584010000000000 | ||
system,host=tars load1=1 1475584020000000000 | ||
system,host=tars load1=3 1475584030000000000 | ||
system,host=tars load1_count=2,load1_max=3,load1_min=1,load1_mean=2,load1_s2=2,load1_stdev=1.414162 1475584010000000000 | ||
``` |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,156 @@ | ||
package basicstats | ||
|
||
import ( | ||
"math" | ||
|
||
"github.com/influxdata/telegraf" | ||
"github.com/influxdata/telegraf/plugins/aggregators" | ||
) | ||
|
||
type BasicStats struct { | ||
cache map[uint64]aggregate | ||
} | ||
|
||
func NewBasicStats() telegraf.Aggregator { | ||
mm := &BasicStats{} | ||
mm.Reset() | ||
return mm | ||
} | ||
|
||
type aggregate struct { | ||
fields map[string]basicstats | ||
name string | ||
tags map[string]string | ||
} | ||
|
||
type basicstats struct { | ||
count float64 | ||
min float64 | ||
max float64 | ||
mean float64 | ||
M2 float64 //intermemedia value for variance/stdev | ||
//stdev float64 | ||
} | ||
|
||
var sampleConfig = ` | ||
## General Aggregator Arguments: | ||
## The period on which to flush & clear the aggregator. | ||
period = "30s" | ||
## If true, the original metric will be dropped by the | ||
## aggregator and will not get sent to the output plugins. | ||
drop_original = false | ||
` | ||
|
||
func (m *BasicStats) SampleConfig() string { | ||
return sampleConfig | ||
} | ||
|
||
func (m *BasicStats) Description() string { | ||
return "Keep the aggregate basicstats of each metric passing through." | ||
} | ||
|
||
func (m *BasicStats) Add(in telegraf.Metric) { | ||
id := in.HashID() | ||
if _, ok := m.cache[id]; !ok { | ||
// hit an uncached metric, create caches for first time: | ||
a := aggregate{ | ||
name: in.Name(), | ||
tags: in.Tags(), | ||
fields: make(map[string]basicstats), | ||
} | ||
for k, v := range in.Fields() { | ||
if fv, ok := convert(v); ok { | ||
a.fields[k] = basicstats{ | ||
count: 1, | ||
min: fv, | ||
max: fv, | ||
mean: fv, | ||
M2: 0.0, | ||
} | ||
} | ||
} | ||
m.cache[id] = a | ||
} else { | ||
for k, v := range in.Fields() { | ||
if fv, ok := convert(v); ok { | ||
if _, ok := m.cache[id].fields[k]; !ok { | ||
// hit an uncached field of a cached metric | ||
m.cache[id].fields[k] = basicstats{ | ||
count: 1, | ||
min: fv, | ||
max: fv, | ||
mean: fv, | ||
M2: 0.0, | ||
} | ||
continue | ||
} | ||
|
||
tmp := m.cache[id].fields[k] | ||
//https://en.m.wikipedia.org/wiki/Algorithms_for_calculating_variance | ||
//variable initialization | ||
x := fv | ||
mean := tmp.mean | ||
M2 := tmp.M2 | ||
//counter compute | ||
n := tmp.count + 1 | ||
tmp.count = n | ||
//mean compute | ||
delta := x - mean | ||
mean = mean + delta/n | ||
tmp.mean = mean | ||
//variance/stdev compute | ||
M2 = M2 + delta*(x-mean) | ||
tmp.M2 = M2 | ||
//max/min compute | ||
if fv < tmp.min { | ||
tmp.min = fv | ||
} else if fv > tmp.max { | ||
tmp.max = fv | ||
} | ||
//store final data | ||
m.cache[id].fields[k] = tmp | ||
} | ||
} | ||
} | ||
} | ||
|
||
func (m *BasicStats) Push(acc telegraf.Accumulator) { | ||
for _, aggregate := range m.cache { | ||
fields := map[string]interface{}{} | ||
for k, v := range aggregate.fields { | ||
fields[k+"_count"] = v.count | ||
fields[k+"_min"] = v.min | ||
fields[k+"_max"] = v.max | ||
fields[k+"_mean"] = v.mean | ||
//v.count always >=1 | ||
if v.count > 1 { | ||
variance := v.M2 / (v.count - 1) | ||
fields[k+"_s2"] = variance | ||
fields[k+"_stdev"] = math.Sqrt(variance) | ||
} | ||
//if count == 1 StdDev = infinite => so I won't send data | ||
} | ||
acc.AddFields(aggregate.name, fields, aggregate.tags) | ||
} | ||
} | ||
|
||
func (m *BasicStats) Reset() { | ||
m.cache = make(map[uint64]aggregate) | ||
} | ||
|
||
func convert(in interface{}) (float64, bool) { | ||
switch v := in.(type) { | ||
case float64: | ||
return v, true | ||
case int64: | ||
return float64(v), true | ||
default: | ||
return 0, false | ||
} | ||
} | ||
|
||
func init() { | ||
aggregators.Add("basicstats", func() telegraf.Aggregator { | ||
return NewBasicStats() | ||
}) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,151 @@ | ||
package basicstats | ||
|
||
import ( | ||
"math" | ||
"testing" | ||
"time" | ||
|
||
"github.com/influxdata/telegraf/metric" | ||
"github.com/influxdata/telegraf/testutil" | ||
) | ||
|
||
var m1, _ = metric.New("m1", | ||
map[string]string{"foo": "bar"}, | ||
map[string]interface{}{ | ||
"a": int64(1), | ||
"b": int64(1), | ||
"c": float64(2), | ||
"d": float64(2), | ||
}, | ||
time.Now(), | ||
) | ||
var m2, _ = metric.New("m1", | ||
map[string]string{"foo": "bar"}, | ||
map[string]interface{}{ | ||
"a": int64(1), | ||
"b": int64(3), | ||
"c": float64(4), | ||
"d": float64(6), | ||
"e": float64(200), | ||
"ignoreme": "string", | ||
"andme": true, | ||
}, | ||
time.Now(), | ||
) | ||
|
||
func BenchmarkApply(b *testing.B) { | ||
minmax := NewBasicStats() | ||
|
||
for n := 0; n < b.N; n++ { | ||
minmax.Add(m1) | ||
minmax.Add(m2) | ||
} | ||
} | ||
|
||
// Test two metrics getting added. | ||
func TestBasicStatsWithPeriod(t *testing.T) { | ||
acc := testutil.Accumulator{} | ||
minmax := NewBasicStats() | ||
|
||
minmax.Add(m1) | ||
minmax.Add(m2) | ||
minmax.Push(&acc) | ||
|
||
expectedFields := map[string]interface{}{ | ||
"a_count": float64(2), //a | ||
"a_max": float64(1), | ||
"a_min": float64(1), | ||
"a_mean": float64(1), | ||
"a_stdev": float64(0), | ||
"a_s2": float64(0), | ||
"b_count": float64(2), //b | ||
"b_max": float64(3), | ||
"b_min": float64(1), | ||
"b_mean": float64(2), | ||
"b_s2": float64(2), | ||
"b_stdev": math.Sqrt(2), | ||
"c_count": float64(2), //c | ||
"c_max": float64(4), | ||
"c_min": float64(2), | ||
"c_mean": float64(3), | ||
"c_s2": float64(2), | ||
"c_stdev": math.Sqrt(2), | ||
"d_count": float64(2), //d | ||
"d_max": float64(6), | ||
"d_min": float64(2), | ||
"d_mean": float64(4), | ||
"d_s2": float64(8), | ||
"d_stdev": math.Sqrt(8), | ||
"e_count": float64(1), //e | ||
"e_max": float64(200), | ||
"e_min": float64(200), | ||
"e_mean": float64(200), | ||
} | ||
expectedTags := map[string]string{ | ||
"foo": "bar", | ||
} | ||
acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags) | ||
} | ||
|
||
// Test two metrics getting added with a push/reset in between (simulates | ||
// getting added in different periods.) | ||
func TestBasicStatsDifferentPeriods(t *testing.T) { | ||
acc := testutil.Accumulator{} | ||
minmax := NewBasicStats() | ||
|
||
minmax.Add(m1) | ||
minmax.Push(&acc) | ||
expectedFields := map[string]interface{}{ | ||
"a_count": float64(1), //a | ||
"a_max": float64(1), | ||
"a_min": float64(1), | ||
"a_mean": float64(1), | ||
"b_count": float64(1), //b | ||
"b_max": float64(1), | ||
"b_min": float64(1), | ||
"b_mean": float64(1), | ||
"c_count": float64(1), //c | ||
"c_max": float64(2), | ||
"c_min": float64(2), | ||
"c_mean": float64(2), | ||
"d_count": float64(1), //d | ||
"d_max": float64(2), | ||
"d_min": float64(2), | ||
"d_mean": float64(2), | ||
} | ||
expectedTags := map[string]string{ | ||
"foo": "bar", | ||
} | ||
acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags) | ||
|
||
acc.ClearMetrics() | ||
minmax.Reset() | ||
minmax.Add(m2) | ||
minmax.Push(&acc) | ||
expectedFields = map[string]interface{}{ | ||
"a_count": float64(1), //a | ||
"a_max": float64(1), | ||
"a_min": float64(1), | ||
"a_mean": float64(1), | ||
"b_count": float64(1), //b | ||
"b_max": float64(3), | ||
"b_min": float64(3), | ||
"b_mean": float64(3), | ||
"c_count": float64(1), //c | ||
"c_max": float64(4), | ||
"c_min": float64(4), | ||
"c_mean": float64(4), | ||
"d_count": float64(1), //d | ||
"d_max": float64(6), | ||
"d_min": float64(6), | ||
"d_mean": float64(6), | ||
"e_count": float64(1), //e | ||
"e_max": float64(200), | ||
"e_min": float64(200), | ||
"e_mean": float64(200), | ||
} | ||
expectedTags = map[string]string{ | ||
"foo": "bar", | ||
} | ||
acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags) | ||
} |