Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add sum stat to basicstats aggregator #3797

Merged
merged 2 commits into from
Mar 5, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions plugins/aggregators/basicstats/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# BasicStats Aggregator Plugin

The BasicStats aggregator plugin give us count,max,min,mean,s2(variance), stdev for a set of values,
The BasicStats aggregator plugin give us count,max,min,mean,sum,s2(variance), stdev for a set of values,
emitting the aggregate every `period` seconds.

### Configuration:
Expand All @@ -21,11 +21,11 @@ emitting the aggregate every `period` seconds.
## BasicStats Arguments:

## Configures which basic stats to push as fields
stats = ["count","min","max","mean","stdev","s2"]
stats = ["count","min","max","mean","stdev","s2","sum"]
```

- stats
- If not specified, all stats are aggregated and pushed as fields
- If not specified, then `count`, `min`, `max`, `mean`, `stdev`, and `s2` are aggregated and pushed as fields. `sum` is not aggregated by default to maintain backwards compatibility.
- If empty array, no stats are aggregated

### Measurements & Fields:
Expand All @@ -35,6 +35,7 @@ emitting the aggregate every `period` seconds.
- field1_max
- field1_min
- field1_mean
- field1_sum
- field1_s2 (variance)
- field1_stdev (standard deviation)

Expand All @@ -48,8 +49,8 @@ No tags are applied by this aggregator.
$ telegraf --config telegraf.conf --quiet
system,host=tars load1=1 1475583980000000000
system,host=tars load1=1 1475583990000000000
system,host=tars load1_count=2,load1_max=1,load1_min=1,load1_mean=1,load1_s2=0,load1_stdev=0 1475584010000000000
system,host=tars load1_count=2,load1_max=1,load1_min=1,load1_mean=1,load1_sum=2,load1_s2=0,load1_stdev=0 1475584010000000000
system,host=tars load1=1 1475584020000000000
system,host=tars load1=3 1475584030000000000
system,host=tars load1_count=2,load1_max=3,load1_min=1,load1_mean=2,load1_s2=2,load1_stdev=1.414162 1475584010000000000
system,host=tars load1_count=2,load1_max=3,load1_min=1,load1_mean=2,load1_sum=4,load1_s2=2,load1_stdev=1.414162 1475584010000000000
```
12 changes: 12 additions & 0 deletions plugins/aggregators/basicstats/basicstats.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type configuredStats struct {
mean bool
variance bool
stdev bool
sum bool
}

func NewBasicStats() *BasicStats {
Expand All @@ -40,6 +41,7 @@ type basicstats struct {
count float64
min float64
max float64
sum float64
mean float64
M2 float64 //intermedia value for variance/stdev
}
Expand Down Expand Up @@ -77,6 +79,7 @@ func (m *BasicStats) Add(in telegraf.Metric) {
min: fv,
max: fv,
mean: fv,
sum: fv,
M2: 0.0,
}
}
Expand All @@ -92,6 +95,7 @@ func (m *BasicStats) Add(in telegraf.Metric) {
min: fv,
max: fv,
mean: fv,
sum: fv,
M2: 0.0,
}
continue
Expand Down Expand Up @@ -119,6 +123,8 @@ func (m *BasicStats) Add(in telegraf.Metric) {
} else if fv > tmp.max {
tmp.max = fv
}
//sum compute
tmp.sum += fv
//store final data
m.cache[id].fields[k] = tmp
}
Expand Down Expand Up @@ -146,6 +152,9 @@ func (m *BasicStats) Push(acc telegraf.Accumulator) {
if config.mean {
fields[k+"_mean"] = v.mean
}
if config.sum {
fields[k+"_sum"] = v.sum
}

//v.count always >=1
if v.count > 1 {
Expand Down Expand Up @@ -187,6 +196,8 @@ func parseStats(names []string) *configuredStats {
parsed.variance = true
case "stdev":
parsed.stdev = true
case "sum":
parsed.sum = true

default:
log.Printf("W! Unrecognized basic stat '%s', ignoring", name)
Expand All @@ -206,6 +217,7 @@ func defaultStats() *configuredStats {
defaults.mean = true
defaults.variance = true
defaults.stdev = true
defaults.sum = false

return defaults
}
Expand Down
152 changes: 152 additions & 0 deletions plugins/aggregators/basicstats/basicstats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
)

var m1, _ = metric.New("m1",
Expand Down Expand Up @@ -250,6 +251,83 @@ func TestBasicStatsWithOnlyMean(t *testing.T) {
acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags)
}

// Test only aggregating sum
func TestBasicStatsWithOnlySum(t *testing.T) {

aggregator := NewBasicStats()
aggregator.Stats = []string{"sum"}

aggregator.Add(m1)
aggregator.Add(m2)

acc := testutil.Accumulator{}
aggregator.Push(&acc)

expectedFields := map[string]interface{}{
"a_sum": float64(2),
"b_sum": float64(4),
"c_sum": float64(6),
"d_sum": float64(8),
"e_sum": float64(200),
}
expectedTags := map[string]string{
"foo": "bar",
}
acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags)
}

// Verify that sum doesn't suffer from floating point errors. Early
// implementations of sum were calulated from mean and count, which
// e.g. summed "1, 1, 5, 1" as "7.999999..." instead of 8.
func TestBasicStatsWithOnlySumFloatingPointErrata(t *testing.T) {

var sum1, _ = metric.New("m1",
map[string]string{},
map[string]interface{}{
"a": int64(1),
},
time.Now(),
)
var sum2, _ = metric.New("m1",
map[string]string{},
map[string]interface{}{
"a": int64(1),
},
time.Now(),
)
var sum3, _ = metric.New("m1",
map[string]string{},
map[string]interface{}{
"a": int64(5),
},
time.Now(),
)
var sum4, _ = metric.New("m1",
map[string]string{},
map[string]interface{}{
"a": int64(1),
},
time.Now(),
)

aggregator := NewBasicStats()
aggregator.Stats = []string{"sum"}

aggregator.Add(sum1)
aggregator.Add(sum2)
aggregator.Add(sum3)
aggregator.Add(sum4)

acc := testutil.Accumulator{}
aggregator.Push(&acc)

expectedFields := map[string]interface{}{
"a_sum": float64(8),
}
expectedTags := map[string]string{}
acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags)
}

// Test only aggregating variance
func TestBasicStatsWithOnlyVariance(t *testing.T) {

Expand Down Expand Up @@ -328,6 +406,57 @@ func TestBasicStatsWithMinAndMax(t *testing.T) {
acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags)
}

// Test aggregating with all stats
func TestBasicStatsWithAllStats(t *testing.T) {
acc := testutil.Accumulator{}
minmax := NewBasicStats()
minmax.Stats = []string{"count", "min", "max", "mean", "stdev", "s2", "sum"}

minmax.Add(m1)
minmax.Add(m2)
minmax.Push(&acc)

expectedFields := map[string]interface{}{
"a_count": float64(2), //a
"a_max": float64(1),
"a_min": float64(1),
"a_mean": float64(1),
"a_stdev": float64(0),
"a_s2": float64(0),
"a_sum": float64(2),
"b_count": float64(2), //b
"b_max": float64(3),
"b_min": float64(1),
"b_mean": float64(2),
"b_s2": float64(2),
"b_sum": float64(4),
"b_stdev": math.Sqrt(2),
"c_count": float64(2), //c
"c_max": float64(4),
"c_min": float64(2),
"c_mean": float64(3),
"c_s2": float64(2),
"c_stdev": math.Sqrt(2),
"c_sum": float64(6),
"d_count": float64(2), //d
"d_max": float64(6),
"d_min": float64(2),
"d_mean": float64(4),
"d_s2": float64(8),
"d_stdev": math.Sqrt(8),
"d_sum": float64(8),
"e_count": float64(1), //e
"e_max": float64(200),
"e_min": float64(200),
"e_mean": float64(200),
"e_sum": float64(200),
}
expectedTags := map[string]string{
"foo": "bar",
}
acc.AssertContainsTaggedFields(t, "m1", expectedFields, expectedTags)
}

// Test that if an empty array is passed, no points are pushed
func TestBasicStatsWithNoStats(t *testing.T) {

Expand Down Expand Up @@ -357,3 +486,26 @@ func TestBasicStatsWithUnknownStat(t *testing.T) {

acc.AssertDoesNotContainMeasurement(t, "m1")
}

// Test that if Stats isn't supplied, then we only do count, min, max, mean,
// stdev, and s2. We purposely exclude sum for backwards compatability,
// otherwise user's working systems will suddenly (and surprisingly) start
// capturing sum without their input.
func TestBasicStatsWithDefaultStats(t *testing.T) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might be overkill, but I like the idea of verifying "default configuration" separately from verifying "aggregations are working".


aggregator := NewBasicStats()

aggregator.Add(m1)
aggregator.Add(m2)

acc := testutil.Accumulator{}
aggregator.Push(&acc)

assert.True(t, acc.HasField("m1", "a_count"))
assert.True(t, acc.HasField("m1", "a_min"))
assert.True(t, acc.HasField("m1", "a_max"))
assert.True(t, acc.HasField("m1", "a_mean"))
assert.True(t, acc.HasField("m1", "a_stdev"))
assert.True(t, acc.HasField("m1", "a_s2"))
assert.False(t, acc.HasField("m1", "a_sum"))
}