From 5fcd2780abd38b45b59e3c96fae2f3a9c891df42 Mon Sep 17 00:00:00 2001 From: vlamug Date: Thu, 9 Feb 2017 19:50:23 +0300 Subject: [PATCH 01/19] The histogram aggregator plugin was added. --- etc/telegraf.conf | 26 ++ plugins/aggregators/all/all.go | 1 + plugins/aggregators/histogram/README.md | 105 ++++++++ plugins/aggregators/histogram/histogram.go | 239 ++++++++++++++++++ .../aggregators/histogram/histogram_test.go | 219 ++++++++++++++++ 5 files changed, 590 insertions(+) create mode 100644 plugins/aggregators/histogram/README.md create mode 100644 plugins/aggregators/histogram/histogram.go create mode 100644 plugins/aggregators/histogram/histogram_test.go diff --git a/etc/telegraf.conf b/etc/telegraf.conf index b7999b3e00fc4..247d90b3e1f2f 100644 --- a/etc/telegraf.conf +++ b/etc/telegraf.conf @@ -606,6 +606,32 @@ # drop_original = false +# # Configuration for aggregate histogram metrics +# [[aggregators.histogram]] +# ## General Aggregator Arguments: +# ## The period on which to flush & clear the aggregator. +# period = "30s" +# ## If true, the original metric will be dropped by the +# ## aggregator and will not get sent to the output plugins. +# drop_original = false +# +# ## The example of config to aggregate histogram for all fields of specified metric. +# [[aggregators.histogram.config]] +# ## The set of buckets. +# buckets = [0.0, 15.6, 34.5, 49.1, 71.5, 80.5, 94.5, 100.0] +# ## The name of metric. +# metric_name = "cpu" +# +# ## The example of config to aggregate for specified fields of metric. +# [[aggregators.histogram.config]] +# ## The set of buckets. +# buckets = [0.0, 10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0, 80.0, 90.0, 100.0] +# ## The name of metric. +# metric_name = "system" +# ## The concrete field of metric +# metric_field = "load1" + + ############################################################################### # INPUT PLUGINS # diff --git a/plugins/aggregators/all/all.go b/plugins/aggregators/all/all.go index 1041a0c9c4814..c4d430cc9aa3b 100644 --- a/plugins/aggregators/all/all.go +++ b/plugins/aggregators/all/all.go @@ -1,5 +1,6 @@ package all import ( + _ "github.com/influxdata/telegraf/plugins/aggregators/histogram" _ "github.com/influxdata/telegraf/plugins/aggregators/minmax" ) diff --git a/plugins/aggregators/histogram/README.md b/plugins/aggregators/histogram/README.md new file mode 100644 index 0000000000000..44fde226089bf --- /dev/null +++ b/plugins/aggregators/histogram/README.md @@ -0,0 +1,105 @@ +# Histogram Aggregator Plugin + +#### Goal + +This plugin was added for ability to build histograms. + +#### Description + +The histogram aggregator plugin aggregates values of specified metric\`s parameters. The metric is emitted every +`period` seconds. All you need to do is to specify borders of histogram buckets and parameters, for which you want to +aggregate histogram. + +#### How it works + +The each metric is passed to the aggregator and this aggregator searches histogram buckets for those parameters, which +have been specified in the config. If buckets are found, the aggregator will put +1 to appropriate bucket. +Otherwise, nothing will happen. Every `period` seconds these data will be pushed to output. + +Also, the algorithm of hit counting to buckets was implemented on the base of the algorithm, which is implemented in +the Prometheus [client](https://github.com/prometheus/client_golang/blob/master/prometheus/histogram.go). + +### Configuration + +```toml +# Configuration for aggregate histogram metrics +[[aggregators.histogram]] + ## General Aggregator Arguments: + ## The period on which to flush & clear the aggregator. + period = "30s" + ## If true, the original metric will be dropped by the + ## aggregator and will not get sent to the output plugins. + drop_original = false + + ## The example of config to aggregate histogram for all fields of specified metric. + [[aggregators.histogram.config]] + ## The set of buckets. + buckets = [0.0, 10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0, 80.0, 90.0, 100.0] + ## The name of metric. + metric_name = "cpu" + + ## The example of config to aggregate histogram for concrete fields of specified metric. + [[aggregators.histogram.config]] + ## The set of buckets. + buckets = [0.0, 10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0, 80.0, 90.0, 100.0] + ## The name of metric. + metric_name = "diskio" + ## The concrete field of metric. + metric_field = "io_time" +``` + +#### Explanation + +The field `metric_field` is the parameter of metric. For example, the metric `cpu` has the following parameters: +usage_user, usage_system, usage_idle, usage_nice, usage_iowait, usage_irq, usage_softirq, usage_steal, usage_guest, +usage_guest_nice. + +Note that histogram metrics will be pushed every `period` seconds. +As you know telegraf calls aggregator `Reset()` func each `period` seconds. Histogram aggregator ignores `Reset()` and continues to count hits. +All counters in histogram will be resetted when one of buckets will be greater than the `math.MaxInt64`. + +#### Use cases + +You can specify parameters using two cases: + + 1. The specifying only metric name. In this case all parameters of metric will be aggregated. + 2. The specifying metric name and concrete parameters. + +#### Some rules + + - The setting of each histogram must be in separate section with title `aggregators.histogram.config`. + + - The each value of bucket must be float value. + + - Don\`t include the border bucket `+Inf`. It will be done automatically. + +### Measurements & Fields: + +The postfix `bucket` will be added to each parameter. + +- measurement1 + - field1_bucket + - field2_bucket + +### Tags: + +All measurements have tag `le`. This tag has the border value of bucket. + +### Example Output: + +The following output will return to the Prometheus client. + +``` +cpu_usage_system_bucket{cpu="cpu-total",host="local",le="0"} 0 +cpu_usage_system_bucket{cpu="cpu-total",host="local",le="10"} 24 +cpu_usage_system_bucket{cpu="cpu-total",host="local",le="20"} 39 +cpu_usage_system_bucket{cpu="cpu-total",host="local",le="30"} 39 +cpu_usage_system_bucket{cpu="cpu-total",host="local",le="40"} 39 +cpu_usage_system_bucket{cpu="cpu-total",host="local",le="50"} 54 +cpu_usage_system_bucket{cpu="cpu-total",host="local",le="60"} 54 +cpu_usage_system_bucket{cpu="cpu-total",host="local",le="70"} 54 +cpu_usage_system_bucket{cpu="cpu-total",host="local",le="80"} 54 +cpu_usage_system_bucket{cpu="cpu-total",host="local",le="90"} 54 +cpu_usage_system_bucket{cpu="cpu-total",host="local",le="100"} 54 +cpu_usage_system_bucket{cpu="cpu-total",host="local",le="+Inf"} 54 +``` diff --git a/plugins/aggregators/histogram/histogram.go b/plugins/aggregators/histogram/histogram.go new file mode 100644 index 0000000000000..e241495ae2d7b --- /dev/null +++ b/plugins/aggregators/histogram/histogram.go @@ -0,0 +1,239 @@ +package histogram + +import ( + "fmt" + "math" + "sort" + "strconv" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/aggregators" +) + +// bucketTag is the tag, which contains right bucket border +const bucketTag = "le" + +// bucketInf is the right bucket border for infinite values +const bucketInf = "+Inf" + +// HistogramAggregator is aggregator with histogram configs and particular histograms for defined metrics +type HistogramAggregator struct { + Configs []config `toml:"config"` + + buckets bucketsByMetrics + cache map[uint64]metricHistogramCollection +} + +// config is the config, which contains name, field of metric and histogram buckets. +type config struct { + Metric string `toml:"metric_name"` + Field string `toml:"metric_field"` + Buckets buckets `toml:"buckets"` +} + +// bucketsByMetrics contains the buckets grouped by metric and field name +type bucketsByMetrics map[string]bucketsByFields + +// bucketsByFields contains the buckets grouped by field name +type bucketsByFields map[string]buckets + +// buckets contains the right borders buckets +type buckets []float64 + +// metricHistogramCollection aggregates the histogram data +type metricHistogramCollection struct { + histogramCollection map[string]counts + metric string + tags map[string]string +} + +// counts is the number of hits in the bucket +type counts []uint64 + +// NewHistogramAggregator creates new histogram aggregator +func NewHistogramAggregator() telegraf.Aggregator { + h := &HistogramAggregator{} + h.buckets = make(bucketsByMetrics) + h.resetCache() + + return h +} + +// sampleConfig is config sample of histogram aggregation plugin +var sampleConfig = ` + # # Configuration for aggregate histogram metrics + # [[aggregators.histogram]] + # ## General Aggregator Arguments: + # ## The period on which to flush & clear the aggregator. + # period = "30s" + # ## If true, the original metric will be dropped by the + # ## aggregator and will not get sent to the output plugins. + # drop_original = false + # + # ## The example of config to aggregate histogram for all fields of specified metric. + # [[aggregators.histogram.config]] + # ## The set of buckets. + # buckets = [0.0, 15.6, 34.5, 49.1, 71.5, 80.5, 94.5, 100.0] + # ## The name of metric. + # metric_name = "cpu" + # + # ## The example of config to aggregate for specified fields of metric. + # [[aggregators.histogram.config]] + # ## The set of buckets. + # buckets = [0.0, 10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0, 80.0, 90.0, 100.0] + # ## The name of metric. + # metric_name = "system" + # ## The concrete field of metric + # metric_field = "load1" +` + +// SampleConfig returns sample of config +func (h *HistogramAggregator) SampleConfig() string { + return sampleConfig +} + +// Description returns description of aggregator plugin +func (h *HistogramAggregator) Description() string { + return "Keep the aggregate histogram of each metric passing through." +} + +// Add adds new hit to the buckets +func (h *HistogramAggregator) Add(in telegraf.Metric) { + id := in.HashID() + agr, ok := h.cache[id] + if !ok { + agr = metricHistogramCollection{ + metric: in.Name(), + tags: in.Tags(), + histogramCollection: make(map[string]counts), + } + } + + for field, value := range in.Fields() { + buckets := h.getBuckets(in.Name(), field) + if buckets == nil { + continue + } + + if agr.histogramCollection[field] == nil { + agr.histogramCollection[field] = make(counts, len(buckets)+1) + } + + if value, ok := convert(value); ok { + index := sort.SearchFloat64s(buckets, value) + if index < len(agr.histogramCollection[field]) { + agr.histogramCollection[field][index]++ + } + } + } + + h.cache[id] = agr +} + +// Push returns histogram values for metrics +func (h *HistogramAggregator) Push(acc telegraf.Accumulator) { + var isResetNeeded = false + + for _, aggregate := range h.cache { + for field, counts := range aggregate.histogramCollection { + + buckets := h.getBuckets(aggregate.metric, field) + count := uint64(0) + + for index, bucket := range buckets { + count += counts[index] + addFields(acc, aggregate, field, strconv.FormatFloat(bucket, 'f', 1, 64), count) + } + + // the adding a value to the infinitive bucket + count += counts[len(counts)-1] + addFields(acc, aggregate, field, bucketInf, count) + + // if count is more than max int 64, then we flush all counts of buckets + if count > math.MaxInt64 { + isResetNeeded = true + } + } + } + + if isResetNeeded { + h.resetCache() + } +} + +// Reset does nothing, because we need to collect counts for a long time, otherwise if config parameter 'reset' has +// small value, we will get a histogram with a small amount of the distribution. +func (h *HistogramAggregator) Reset() {} + +// resetCache resets cached counts(hits) in the buckets +func (h *HistogramAggregator) resetCache() { + h.cache = make(map[uint64]metricHistogramCollection) +} + +// checkAndGetBuckets checks the order of buckets and returns them. +func (h *HistogramAggregator) getBuckets(metric string, field string) []float64 { + if buckets, ok := h.buckets[metric][field]; ok { + return buckets + } + + for _, config := range h.Configs { + if config.Metric == metric && (config.Field == "" || config.Field == field) { + if _, ok := h.buckets[metric]; !ok { + h.buckets[metric] = make(bucketsByFields) + } + + checkOrder(config.Buckets, metric, field) + + h.buckets[metric][field] = config.Buckets + } + } + + return h.buckets[metric][field] +} + +// addFields adds the field with specified tags to accumulator +func addFields(acc telegraf.Accumulator, agr metricHistogramCollection, field string, bucketTagVal string, count uint64) { + fields := map[string]interface{}{field + "_bucket": count} + + tags := map[string]string{} + for key, val := range agr.tags { + tags[key] = val + } + tags[bucketTag] = bucketTagVal + + acc.AddFields(agr.metric, fields, tags) +} + +// checkOrder checks the order of buckets, so that the current value must be more than previous value +func checkOrder(buckets []float64, metric string, field string) { + for i, bucket := range buckets { + if i < len(buckets)-1 && bucket >= buckets[i+1] { + panic(fmt.Errorf( + "histogram buckets must be in increasing order: %.2f >= %.2f, metrics: %s, field: %s", + bucket, + buckets[i+1], + metric, + field, + )) + } + } +} + +// convert converts interface to concrete type +func convert(in interface{}) (float64, bool) { + switch v := in.(type) { + case float64: + return v, true + case int64: + return float64(v), true + default: + return 0, false + } +} + +// init initializes histogram aggregator plugin +func init() { + aggregators.Add("histogram", func() telegraf.Aggregator { + return NewHistogramAggregator() + }) +} diff --git a/plugins/aggregators/histogram/histogram_test.go b/plugins/aggregators/histogram/histogram_test.go new file mode 100644 index 0000000000000..5f0c9b8ef1358 --- /dev/null +++ b/plugins/aggregators/histogram/histogram_test.go @@ -0,0 +1,219 @@ +package histogram + +import ( + "fmt" + "testing" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/metric" + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/assert" +) + +// NewTestHistogram creates new test histogram aggregation with specified config +func NewTestHistogram(cfg []config) telegraf.Aggregator { + htm := &HistogramAggregator{Configs: cfg} + htm.cache = make(map[uint64]metricHistogramCollection) + + return htm +} + +// firstMetric1 is the first test metric +var firstMetric1, _ = metric.New( + "first_metric_name", + map[string]string{"tag_name": "tag_value"}, + map[string]interface{}{ + "a": float64(15.3), + "b": float64(40), + }, + time.Now(), +) + +// firstMetric1 is the first test metric with other value +var firstMetric2, _ = metric.New( + "first_metric_name", + map[string]string{"tag_name": "tag_value"}, + map[string]interface{}{ + "a": float64(15.9), + }, + time.Now(), +) + +// secondMetric is the second metric +var secondMetric, _ = metric.New( + "second_metric_name", + map[string]string{"tag_name": "tag_value"}, + map[string]interface{}{ + "a": float64(105), + "ignoreme": "string", + "andme": true, + }, + time.Now(), +) + +// BenchmarkApply runs benchmarks +func BenchmarkApply(b *testing.B) { + histogram := NewHistogramAggregator() + + for n := 0; n < b.N; n++ { + histogram.Add(firstMetric1) + histogram.Add(firstMetric2) + histogram.Add(secondMetric) + } +} + +// TestHistogramWithPeriodAndOneField tests metrics for one period and for one field +func TestHistogramWithPeriodAndOneField(t *testing.T) { + var cfg []config + cfg = append(cfg, config{Metric: "first_metric_name", Field: "a", Buckets: []float64{0.0, 10.0, 20.0, 30.0, 40.0}}) + histogram := NewTestHistogram(cfg) + + acc := &testutil.Accumulator{} + + histogram.Add(firstMetric1) + histogram.Add(firstMetric2) + histogram.Push(acc) + + if len(acc.Metrics) != 6 { + assert.Fail(t, "Incorrect number of metrics") + } + + assertContainsTaggedField(t, acc, "first_metric_name", "a_bucket", uint64(0), "0.0") + assertContainsTaggedField(t, acc, "first_metric_name", "a_bucket", uint64(0), "10.0") + assertContainsTaggedField(t, acc, "first_metric_name", "a_bucket", uint64(2), "20.0") + assertContainsTaggedField(t, acc, "first_metric_name", "a_bucket", uint64(2), "30.0") + assertContainsTaggedField(t, acc, "first_metric_name", "a_bucket", uint64(2), "40.0") + assertContainsTaggedField(t, acc, "first_metric_name", "a_bucket", uint64(2), bucketInf) +} +// TestHistogramWithPeriodAndAllFields tests two metrics for one period and for all fields +func TestHistogramWithPeriodAndAllFields(t *testing.T) { + var cfg []config + cfg = append(cfg, config{Metric: "first_metric_name", Buckets: []float64{0.0, 15.5, 20.0, 30.0, 40.0}}) + cfg = append(cfg, config{Metric: "second_metric_name", Buckets: []float64{0.0, 4.0, 10.0, 23.0, 30.0}}) + histogram := NewTestHistogram(cfg) + + acc := &testutil.Accumulator{} + + histogram.Add(firstMetric1) + histogram.Add(firstMetric2) + histogram.Add(secondMetric) + histogram.Push(acc) + + if len(acc.Metrics) != 30 { + assert.Fail(t, "Incorrect number of metrics") + } + + assertContainsTaggedField(t, acc, "first_metric_name", "a_bucket", uint64(0), "0.0") + assertContainsTaggedField(t, acc, "first_metric_name", "a_bucket", uint64(1), "15.5") + assertContainsTaggedField(t, acc, "first_metric_name", "a_bucket", uint64(2), "20.0") + assertContainsTaggedField(t, acc, "first_metric_name", "a_bucket", uint64(2), "30.0") + assertContainsTaggedField(t, acc, "first_metric_name", "a_bucket", uint64(2), "40.0") + assertContainsTaggedField(t, acc, "first_metric_name", "a_bucket", uint64(2), bucketInf) + + assertContainsTaggedField(t, acc, "first_metric_name", "b_bucket", uint64(0), "0.0") + assertContainsTaggedField(t, acc, "first_metric_name", "b_bucket", uint64(0), "15.5") + assertContainsTaggedField(t, acc, "first_metric_name", "b_bucket", uint64(0), "20.0") + assertContainsTaggedField(t, acc, "first_metric_name", "b_bucket", uint64(0), "30.0") + assertContainsTaggedField(t, acc, "first_metric_name", "b_bucket", uint64(1), "40.0") + assertContainsTaggedField(t, acc, "first_metric_name", "b_bucket", uint64(1), bucketInf) + + assertContainsTaggedField(t, acc, "second_metric_name", "a_bucket", uint64(0), "0.0") + assertContainsTaggedField(t, acc, "second_metric_name", "a_bucket", uint64(0), "4.0") + assertContainsTaggedField(t, acc, "second_metric_name", "a_bucket", uint64(0), "10.0") + assertContainsTaggedField(t, acc, "second_metric_name", "a_bucket", uint64(0), "23.0") + assertContainsTaggedField(t, acc, "second_metric_name", "a_bucket", uint64(0), "30.0") + assertContainsTaggedField(t, acc, "second_metric_name", "a_bucket", uint64(1), bucketInf) + + assertContainsTaggedField(t, acc, "second_metric_name", "ignoreme_bucket", uint64(0), "0.0") + assertContainsTaggedField(t, acc, "second_metric_name", "ignoreme_bucket", uint64(0), "4.0") + assertContainsTaggedField(t, acc, "second_metric_name", "ignoreme_bucket", uint64(0), "10.0") + assertContainsTaggedField(t, acc, "second_metric_name", "ignoreme_bucket", uint64(0), "23.0") + assertContainsTaggedField(t, acc, "second_metric_name", "ignoreme_bucket", uint64(0), "30.0") + assertContainsTaggedField(t, acc, "second_metric_name", "ignoreme_bucket", uint64(0), bucketInf) + + assertContainsTaggedField(t, acc, "second_metric_name", "andme_bucket", uint64(0), "0.0") + assertContainsTaggedField(t, acc, "second_metric_name", "andme_bucket", uint64(0), "4.0") + assertContainsTaggedField(t, acc, "second_metric_name", "andme_bucket", uint64(0), "10.0") + assertContainsTaggedField(t, acc, "second_metric_name", "andme_bucket", uint64(0), "23.0") + assertContainsTaggedField(t, acc, "second_metric_name", "andme_bucket", uint64(0), "30.0") + assertContainsTaggedField(t, acc, "second_metric_name", "andme_bucket", uint64(0), bucketInf) +} + +// TestHistogramDifferentPeriodsAndAllFields tests two metrics getting added with a push/reset in between (simulates +// getting added in different periods) for all fields +func TestHistogramDifferentPeriodsAndAllFields(t *testing.T) { + + var cfg []config + cfg = append(cfg, config{Metric: "first_metric_name", Buckets: []float64{0.0, 10.0, 20.0, 30.0, 40.0}}) + histogram := NewTestHistogram(cfg) + + acc := &testutil.Accumulator{} + histogram.Add(firstMetric1) + histogram.Push(acc) + + assertContainsTaggedField(t, acc, "first_metric_name", "a_bucket", uint64(0), "0.0") + assertContainsTaggedField(t, acc, "first_metric_name", "a_bucket", uint64(0), "10.0") + assertContainsTaggedField(t, acc, "first_metric_name", "a_bucket", uint64(1), "20.0") + assertContainsTaggedField(t, acc, "first_metric_name", "a_bucket", uint64(1), "30.0") + assertContainsTaggedField(t, acc, "first_metric_name", "a_bucket", uint64(1), "40.0") + assertContainsTaggedField(t, acc, "first_metric_name", "a_bucket", uint64(1), bucketInf) + + acc.ClearMetrics() + histogram.Add(firstMetric2) + histogram.Push(acc) + + assertContainsTaggedField(t, acc, "first_metric_name", "a_bucket", uint64(0), "0.0") + assertContainsTaggedField(t, acc, "first_metric_name", "a_bucket", uint64(0), "10.0") + assertContainsTaggedField(t, acc, "first_metric_name", "a_bucket", uint64(2), "20.0") + assertContainsTaggedField(t, acc, "first_metric_name", "a_bucket", uint64(2), "30.0") + assertContainsTaggedField(t, acc, "first_metric_name", "a_bucket", uint64(2), "40.0") + assertContainsTaggedField(t, acc, "first_metric_name", "a_bucket", uint64(2), bucketInf) +} + +// TestWrongBucketsOrder tests the calling panic with incorrect order of buckets +func TestWrongBucketsOrder(t *testing.T) { + defer func() { + if r := recover(); r != nil { + assert.Equal( + t, + "histogram buckets must be in increasing order: 90.00 >= 20.00, metrics: first_metric_name, field: a", + fmt.Sprint(r), + ) + } + }() + + var cfg []config + cfg = append(cfg, config{Metric: "first_metric_name", Buckets: []float64{0.0, 90.0, 20.0, 30.0, 40.0}}) + histogram := NewTestHistogram(cfg) + histogram.Add(firstMetric2) +} + +// assertContainsTaggedField is help functions to test histogram data +func assertContainsTaggedField(t *testing.T, acc *testutil.Accumulator, metricName string, field string, counts uint64, le string) { + expectedFields := map[string]interface{}{} + expectedFields[field] = counts + + acc.Lock() + defer acc.Unlock() + + for _, metric := range acc.Metrics { + if metric.Measurement != metricName { + continue + } + + if _, ok := metric.Fields[field]; !ok { + continue + } + + if metric.Tags[bucketTag] == le { + if assert.Equal(t, expectedFields, metric.Fields) { + return + } + + assert.Fail(t, fmt.Sprintf("incorrect fields %v of metric %s", expectedFields, metricName)) + } + } + + assert.Fail(t, fmt.Sprintf("unknown measurement %s with tags %v", metricName, []string{"tag_name", "le"})) +} From 9f9327f8f1c876e7d8fc84f1dee9479f58166315 Mon Sep 17 00:00:00 2001 From: vlamug Date: Thu, 9 Feb 2017 20:01:43 +0300 Subject: [PATCH 02/19] The gofmt tool was applied. --- plugins/aggregators/histogram/histogram_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/plugins/aggregators/histogram/histogram_test.go b/plugins/aggregators/histogram/histogram_test.go index 5f0c9b8ef1358..120eb2bb19821 100644 --- a/plugins/aggregators/histogram/histogram_test.go +++ b/plugins/aggregators/histogram/histogram_test.go @@ -86,6 +86,7 @@ func TestHistogramWithPeriodAndOneField(t *testing.T) { assertContainsTaggedField(t, acc, "first_metric_name", "a_bucket", uint64(2), "40.0") assertContainsTaggedField(t, acc, "first_metric_name", "a_bucket", uint64(2), bucketInf) } + // TestHistogramWithPeriodAndAllFields tests two metrics for one period and for all fields func TestHistogramWithPeriodAndAllFields(t *testing.T) { var cfg []config From d24d011e4870b1ae5a6e79925e8e18322a39c1ab Mon Sep 17 00:00:00 2001 From: vlamug Date: Fri, 10 Feb 2017 09:56:49 +0300 Subject: [PATCH 03/19] The problem with test was fixed. --- plugins/aggregators/histogram/histogram_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/plugins/aggregators/histogram/histogram_test.go b/plugins/aggregators/histogram/histogram_test.go index 120eb2bb19821..461e2079bfd90 100644 --- a/plugins/aggregators/histogram/histogram_test.go +++ b/plugins/aggregators/histogram/histogram_test.go @@ -14,7 +14,8 @@ import ( // NewTestHistogram creates new test histogram aggregation with specified config func NewTestHistogram(cfg []config) telegraf.Aggregator { htm := &HistogramAggregator{Configs: cfg} - htm.cache = make(map[uint64]metricHistogramCollection) + htm.buckets = make(bucketsByMetrics) + htm.resetCache() return htm } From 53b63fada7fe4b752c9aebae055c97d82723fe85 Mon Sep 17 00:00:00 2001 From: vlamug Date: Tue, 14 Feb 2017 16:44:12 +0300 Subject: [PATCH 04/19] Changed the config format of histogram. --- etc/telegraf.conf | 6 +-- plugins/aggregators/histogram/README.md | 38 ++++++++++--------- plugins/aggregators/histogram/histogram.go | 34 +++++++++++++---- .../aggregators/histogram/histogram_test.go | 2 +- 4 files changed, 50 insertions(+), 30 deletions(-) diff --git a/etc/telegraf.conf b/etc/telegraf.conf index 247d90b3e1f2f..3e1395c02f3b4 100644 --- a/etc/telegraf.conf +++ b/etc/telegraf.conf @@ -627,9 +627,9 @@ # ## The set of buckets. # buckets = [0.0, 10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0, 80.0, 90.0, 100.0] # ## The name of metric. -# metric_name = "system" -# ## The concrete field of metric -# metric_field = "load1" +# metric_name = "diskio" +# ## The concrete fields of metric +# metric_fields = ["io_time", "read_time", "write_time"] diff --git a/plugins/aggregators/histogram/README.md b/plugins/aggregators/histogram/README.md index 44fde226089bf..247c128e48737 100644 --- a/plugins/aggregators/histogram/README.md +++ b/plugins/aggregators/histogram/README.md @@ -34,7 +34,7 @@ the Prometheus [client](https://github.com/prometheus/client_golang/blob/master/ ## The example of config to aggregate histogram for all fields of specified metric. [[aggregators.histogram.config]] ## The set of buckets. - buckets = [0.0, 10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0, 80.0, 90.0, 100.0] + buckets = [0.0, 15.6, 34.5, 49.1, 71.5, 80.5, 94.5, 100.0] ## The name of metric. metric_name = "cpu" @@ -44,19 +44,18 @@ the Prometheus [client](https://github.com/prometheus/client_golang/blob/master/ buckets = [0.0, 10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0, 80.0, 90.0, 100.0] ## The name of metric. metric_name = "diskio" - ## The concrete field of metric. - metric_field = "io_time" + ## The concrete fields of metric. + metric_fields = ["io_time", "read_time", "write_time"] ``` #### Explanation -The field `metric_field` is the parameter of metric. For example, the metric `cpu` has the following parameters: +The field `metric_fields` is the list of metric parameters. For example, the metric `cpu` has the following parameters: usage_user, usage_system, usage_idle, usage_nice, usage_iowait, usage_irq, usage_softirq, usage_steal, usage_guest, usage_guest_nice. Note that histogram metrics will be pushed every `period` seconds. As you know telegraf calls aggregator `Reset()` func each `period` seconds. Histogram aggregator ignores `Reset()` and continues to count hits. -All counters in histogram will be resetted when one of buckets will be greater than the `math.MaxInt64`. #### Use cases @@ -83,23 +82,26 @@ The postfix `bucket` will be added to each parameter. ### Tags: -All measurements have tag `le`. This tag has the border value of bucket. +All measurements have tag `le`. This tag has the border value of bucket. It means that the metric value is less or equal +to the value of this tag. For example, let assume that we have the metric value 10 and the following buckets: +[5, 10, 30, 70, 100]. Then the tag `le` will have the value 10, because the metrics value is passed into bucket with +right border value `10`. ### Example Output: The following output will return to the Prometheus client. ``` -cpu_usage_system_bucket{cpu="cpu-total",host="local",le="0"} 0 -cpu_usage_system_bucket{cpu="cpu-total",host="local",le="10"} 24 -cpu_usage_system_bucket{cpu="cpu-total",host="local",le="20"} 39 -cpu_usage_system_bucket{cpu="cpu-total",host="local",le="30"} 39 -cpu_usage_system_bucket{cpu="cpu-total",host="local",le="40"} 39 -cpu_usage_system_bucket{cpu="cpu-total",host="local",le="50"} 54 -cpu_usage_system_bucket{cpu="cpu-total",host="local",le="60"} 54 -cpu_usage_system_bucket{cpu="cpu-total",host="local",le="70"} 54 -cpu_usage_system_bucket{cpu="cpu-total",host="local",le="80"} 54 -cpu_usage_system_bucket{cpu="cpu-total",host="local",le="90"} 54 -cpu_usage_system_bucket{cpu="cpu-total",host="local",le="100"} 54 -cpu_usage_system_bucket{cpu="cpu-total",host="local",le="+Inf"} 54 +cpu,cpu=cpu1,host=localhost,le=0.0 usage_idle_bucket=0i 1486998330000000000 +cpu,cpu=cpu1,host=localhost,le=10.0 usage_idle_bucket=0i 1486998330000000000 +cpu,cpu=cpu1,host=localhost,le=20.0 usage_idle_bucket=1i 1486998330000000000 +cpu,cpu=cpu1,host=localhost,le=30.0 usage_idle_bucket=2i 1486998330000000000 +cpu,cpu=cpu1,host=localhost,le=40.0 usage_idle_bucket=2i 1486998330000000000 +cpu,cpu=cpu1,host=localhost,le=50.0 usage_idle_bucket=2i 1486998330000000000 +cpu,cpu=cpu1,host=localhost,le=60.0 usage_idle_bucket=2i 1486998330000000000 +cpu,cpu=cpu1,host=localhost,le=70.0 usage_idle_bucket=2i 1486998330000000000 +cpu,cpu=cpu1,host=localhost,le=80.0 usage_idle_bucket=2i 1486998330000000000 +cpu,cpu=cpu1,host=localhost,le=90.0 usage_idle_bucket=2i 1486998330000000000 +cpu,cpu=cpu1,host=localhost,le=100.0 usage_idle_bucket=2i 1486998330000000000 +cpu,cpu=cpu1,host=localhost,le=+Inf usage_idle_bucket=2i 1486998330000000000 ``` diff --git a/plugins/aggregators/histogram/histogram.go b/plugins/aggregators/histogram/histogram.go index e241495ae2d7b..e058ef31a7442 100644 --- a/plugins/aggregators/histogram/histogram.go +++ b/plugins/aggregators/histogram/histogram.go @@ -27,7 +27,7 @@ type HistogramAggregator struct { // config is the config, which contains name, field of metric and histogram buckets. type config struct { Metric string `toml:"metric_name"` - Field string `toml:"metric_field"` + Fields []string `toml:"metric_fields"` Buckets buckets `toml:"buckets"` } @@ -82,9 +82,9 @@ var sampleConfig = ` # ## The set of buckets. # buckets = [0.0, 10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0, 80.0, 90.0, 100.0] # ## The name of metric. - # metric_name = "system" - # ## The concrete field of metric - # metric_field = "load1" + # metric_name = "diskio" + # ## The concrete fields of metric + # metric_fields = ["io_time", "read_time", "write_time"] ` // SampleConfig returns sample of config @@ -121,9 +121,7 @@ func (h *HistogramAggregator) Add(in telegraf.Metric) { if value, ok := convert(value); ok { index := sort.SearchFloat64s(buckets, value) - if index < len(agr.histogramCollection[field]) { - agr.histogramCollection[field][index]++ - } + agr.histogramCollection[field][index]++ } } @@ -177,7 +175,12 @@ func (h *HistogramAggregator) getBuckets(metric string, field string) []float64 } for _, config := range h.Configs { - if config.Metric == metric && (config.Field == "" || config.Field == field) { + if config.Metric == metric { + if !isBucketExists(field, config) { + continue + } + + if _, ok := h.buckets[metric]; !ok { h.buckets[metric] = make(bucketsByFields) } @@ -191,6 +194,21 @@ func (h *HistogramAggregator) getBuckets(metric string, field string) []float64 return h.buckets[metric][field] } +// isBucketExists checks if buckets exists for the passed field +func isBucketExists(field string, cfg config) bool { + if len(cfg.Fields) == 0 { + return true + } + + for _, fl := range cfg.Fields { + if fl == field { + return true + } + } + + return false +} + // addFields adds the field with specified tags to accumulator func addFields(acc telegraf.Accumulator, agr metricHistogramCollection, field string, bucketTagVal string, count uint64) { fields := map[string]interface{}{field + "_bucket": count} diff --git a/plugins/aggregators/histogram/histogram_test.go b/plugins/aggregators/histogram/histogram_test.go index 461e2079bfd90..22f68db61627c 100644 --- a/plugins/aggregators/histogram/histogram_test.go +++ b/plugins/aggregators/histogram/histogram_test.go @@ -67,7 +67,7 @@ func BenchmarkApply(b *testing.B) { // TestHistogramWithPeriodAndOneField tests metrics for one period and for one field func TestHistogramWithPeriodAndOneField(t *testing.T) { var cfg []config - cfg = append(cfg, config{Metric: "first_metric_name", Field: "a", Buckets: []float64{0.0, 10.0, 20.0, 30.0, 40.0}}) + cfg = append(cfg, config{Metric: "first_metric_name", Fields: []string{"a"}, Buckets: []float64{0.0, 10.0, 20.0, 30.0, 40.0}}) histogram := NewTestHistogram(cfg) acc := &testutil.Accumulator{} From 232c4328cc334b5384776052e4f2228fca2422b1 Mon Sep 17 00:00:00 2001 From: vlamug Date: Tue, 14 Feb 2017 16:45:15 +0300 Subject: [PATCH 05/19] The using uint64 instead int64. The checking of overflow of bucket hits was deleted. --- plugins/aggregators/histogram/histogram.go | 18 +++--------------- 1 file changed, 3 insertions(+), 15 deletions(-) diff --git a/plugins/aggregators/histogram/histogram.go b/plugins/aggregators/histogram/histogram.go index e058ef31a7442..a050d1847439c 100644 --- a/plugins/aggregators/histogram/histogram.go +++ b/plugins/aggregators/histogram/histogram.go @@ -2,7 +2,6 @@ package histogram import ( "fmt" - "math" "sort" "strconv" @@ -48,7 +47,7 @@ type metricHistogramCollection struct { } // counts is the number of hits in the bucket -type counts []uint64 +type counts []int64 // NewHistogramAggregator creates new histogram aggregator func NewHistogramAggregator() telegraf.Aggregator { @@ -130,13 +129,11 @@ func (h *HistogramAggregator) Add(in telegraf.Metric) { // Push returns histogram values for metrics func (h *HistogramAggregator) Push(acc telegraf.Accumulator) { - var isResetNeeded = false - for _, aggregate := range h.cache { for field, counts := range aggregate.histogramCollection { buckets := h.getBuckets(aggregate.metric, field) - count := uint64(0) + count := int64(0) for index, bucket := range buckets { count += counts[index] @@ -146,17 +143,8 @@ func (h *HistogramAggregator) Push(acc telegraf.Accumulator) { // the adding a value to the infinitive bucket count += counts[len(counts)-1] addFields(acc, aggregate, field, bucketInf, count) - - // if count is more than max int 64, then we flush all counts of buckets - if count > math.MaxInt64 { - isResetNeeded = true - } } } - - if isResetNeeded { - h.resetCache() - } } // Reset does nothing, because we need to collect counts for a long time, otherwise if config parameter 'reset' has @@ -210,7 +198,7 @@ func isBucketExists(field string, cfg config) bool { } // addFields adds the field with specified tags to accumulator -func addFields(acc telegraf.Accumulator, agr metricHistogramCollection, field string, bucketTagVal string, count uint64) { +func addFields(acc telegraf.Accumulator, agr metricHistogramCollection, field string, bucketTagVal string, count int64) { fields := map[string]interface{}{field + "_bucket": count} tags := map[string]string{} From 17ea04e713b50d6eff8ed256e25660d17bc6bc47 Mon Sep 17 00:00:00 2001 From: vlamug Date: Tue, 14 Feb 2017 16:56:07 +0300 Subject: [PATCH 06/19] The go fmt tool was applied. --- plugins/aggregators/histogram/histogram.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/plugins/aggregators/histogram/histogram.go b/plugins/aggregators/histogram/histogram.go index a050d1847439c..e9ad14aedf0f9 100644 --- a/plugins/aggregators/histogram/histogram.go +++ b/plugins/aggregators/histogram/histogram.go @@ -25,9 +25,9 @@ type HistogramAggregator struct { // config is the config, which contains name, field of metric and histogram buckets. type config struct { - Metric string `toml:"metric_name"` - Fields []string `toml:"metric_fields"` - Buckets buckets `toml:"buckets"` + Metric string `toml:"metric_name"` + Fields []string `toml:"metric_fields"` + Buckets buckets `toml:"buckets"` } // bucketsByMetrics contains the buckets grouped by metric and field name @@ -168,7 +168,6 @@ func (h *HistogramAggregator) getBuckets(metric string, field string) []float64 continue } - if _, ok := h.buckets[metric]; !ok { h.buckets[metric] = make(bucketsByFields) } From 79ea10fb759abc3e223afc1e635ef122d7baa0cd Mon Sep 17 00:00:00 2001 From: vlamug Date: Tue, 14 Feb 2017 16:59:24 +0300 Subject: [PATCH 07/19] The tests were fixed. --- .../aggregators/histogram/histogram_test.go | 106 +++++++++--------- 1 file changed, 53 insertions(+), 53 deletions(-) diff --git a/plugins/aggregators/histogram/histogram_test.go b/plugins/aggregators/histogram/histogram_test.go index 22f68db61627c..6a80210752847 100644 --- a/plugins/aggregators/histogram/histogram_test.go +++ b/plugins/aggregators/histogram/histogram_test.go @@ -80,12 +80,12 @@ func TestHistogramWithPeriodAndOneField(t *testing.T) { assert.Fail(t, "Incorrect number of metrics") } - assertContainsTaggedField(t, acc, "first_metric_name", "a_bucket", uint64(0), "0.0") - assertContainsTaggedField(t, acc, "first_metric_name", "a_bucket", uint64(0), "10.0") - assertContainsTaggedField(t, acc, "first_metric_name", "a_bucket", uint64(2), "20.0") - assertContainsTaggedField(t, acc, "first_metric_name", "a_bucket", uint64(2), "30.0") - assertContainsTaggedField(t, acc, "first_metric_name", "a_bucket", uint64(2), "40.0") - assertContainsTaggedField(t, acc, "first_metric_name", "a_bucket", uint64(2), bucketInf) + assertContainsTaggedField(t, acc, "first_metric_name", "a_bucket", int64(0), "0.0") + assertContainsTaggedField(t, acc, "first_metric_name", "a_bucket", int64(0), "10.0") + assertContainsTaggedField(t, acc, "first_metric_name", "a_bucket", int64(2), "20.0") + assertContainsTaggedField(t, acc, "first_metric_name", "a_bucket", int64(2), "30.0") + assertContainsTaggedField(t, acc, "first_metric_name", "a_bucket", int64(2), "40.0") + assertContainsTaggedField(t, acc, "first_metric_name", "a_bucket", int64(2), bucketInf) } // TestHistogramWithPeriodAndAllFields tests two metrics for one period and for all fields @@ -106,40 +106,40 @@ func TestHistogramWithPeriodAndAllFields(t *testing.T) { assert.Fail(t, "Incorrect number of metrics") } - assertContainsTaggedField(t, acc, "first_metric_name", "a_bucket", uint64(0), "0.0") - assertContainsTaggedField(t, acc, "first_metric_name", "a_bucket", uint64(1), "15.5") - assertContainsTaggedField(t, acc, "first_metric_name", "a_bucket", uint64(2), "20.0") - assertContainsTaggedField(t, acc, "first_metric_name", "a_bucket", uint64(2), "30.0") - assertContainsTaggedField(t, acc, "first_metric_name", "a_bucket", uint64(2), "40.0") - assertContainsTaggedField(t, acc, "first_metric_name", "a_bucket", uint64(2), bucketInf) - - assertContainsTaggedField(t, acc, "first_metric_name", "b_bucket", uint64(0), "0.0") - assertContainsTaggedField(t, acc, "first_metric_name", "b_bucket", uint64(0), "15.5") - assertContainsTaggedField(t, acc, "first_metric_name", "b_bucket", uint64(0), "20.0") - assertContainsTaggedField(t, acc, "first_metric_name", "b_bucket", uint64(0), "30.0") - assertContainsTaggedField(t, acc, "first_metric_name", "b_bucket", uint64(1), "40.0") - assertContainsTaggedField(t, acc, "first_metric_name", "b_bucket", uint64(1), bucketInf) - - assertContainsTaggedField(t, acc, "second_metric_name", "a_bucket", uint64(0), "0.0") - assertContainsTaggedField(t, acc, "second_metric_name", "a_bucket", uint64(0), "4.0") - assertContainsTaggedField(t, acc, "second_metric_name", "a_bucket", uint64(0), "10.0") - assertContainsTaggedField(t, acc, "second_metric_name", "a_bucket", uint64(0), "23.0") - assertContainsTaggedField(t, acc, "second_metric_name", "a_bucket", uint64(0), "30.0") - assertContainsTaggedField(t, acc, "second_metric_name", "a_bucket", uint64(1), bucketInf) - - assertContainsTaggedField(t, acc, "second_metric_name", "ignoreme_bucket", uint64(0), "0.0") - assertContainsTaggedField(t, acc, "second_metric_name", "ignoreme_bucket", uint64(0), "4.0") - assertContainsTaggedField(t, acc, "second_metric_name", "ignoreme_bucket", uint64(0), "10.0") - assertContainsTaggedField(t, acc, "second_metric_name", "ignoreme_bucket", uint64(0), "23.0") - assertContainsTaggedField(t, acc, "second_metric_name", "ignoreme_bucket", uint64(0), "30.0") - assertContainsTaggedField(t, acc, "second_metric_name", "ignoreme_bucket", uint64(0), bucketInf) - - assertContainsTaggedField(t, acc, "second_metric_name", "andme_bucket", uint64(0), "0.0") - assertContainsTaggedField(t, acc, "second_metric_name", "andme_bucket", uint64(0), "4.0") - assertContainsTaggedField(t, acc, "second_metric_name", "andme_bucket", uint64(0), "10.0") - assertContainsTaggedField(t, acc, "second_metric_name", "andme_bucket", uint64(0), "23.0") - assertContainsTaggedField(t, acc, "second_metric_name", "andme_bucket", uint64(0), "30.0") - assertContainsTaggedField(t, acc, "second_metric_name", "andme_bucket", uint64(0), bucketInf) + assertContainsTaggedField(t, acc, "first_metric_name", "a_bucket", int64(0), "0.0") + assertContainsTaggedField(t, acc, "first_metric_name", "a_bucket", int64(1), "15.5") + assertContainsTaggedField(t, acc, "first_metric_name", "a_bucket", int64(2), "20.0") + assertContainsTaggedField(t, acc, "first_metric_name", "a_bucket", int64(2), "30.0") + assertContainsTaggedField(t, acc, "first_metric_name", "a_bucket", int64(2), "40.0") + assertContainsTaggedField(t, acc, "first_metric_name", "a_bucket", int64(2), bucketInf) + + assertContainsTaggedField(t, acc, "first_metric_name", "b_bucket", int64(0), "0.0") + assertContainsTaggedField(t, acc, "first_metric_name", "b_bucket", int64(0), "15.5") + assertContainsTaggedField(t, acc, "first_metric_name", "b_bucket", int64(0), "20.0") + assertContainsTaggedField(t, acc, "first_metric_name", "b_bucket", int64(0), "30.0") + assertContainsTaggedField(t, acc, "first_metric_name", "b_bucket", int64(1), "40.0") + assertContainsTaggedField(t, acc, "first_metric_name", "b_bucket", int64(1), bucketInf) + + assertContainsTaggedField(t, acc, "second_metric_name", "a_bucket", int64(0), "0.0") + assertContainsTaggedField(t, acc, "second_metric_name", "a_bucket", int64(0), "4.0") + assertContainsTaggedField(t, acc, "second_metric_name", "a_bucket", int64(0), "10.0") + assertContainsTaggedField(t, acc, "second_metric_name", "a_bucket", int64(0), "23.0") + assertContainsTaggedField(t, acc, "second_metric_name", "a_bucket", int64(0), "30.0") + assertContainsTaggedField(t, acc, "second_metric_name", "a_bucket", int64(1), bucketInf) + + assertContainsTaggedField(t, acc, "second_metric_name", "ignoreme_bucket", int64(0), "0.0") + assertContainsTaggedField(t, acc, "second_metric_name", "ignoreme_bucket", int64(0), "4.0") + assertContainsTaggedField(t, acc, "second_metric_name", "ignoreme_bucket", int64(0), "10.0") + assertContainsTaggedField(t, acc, "second_metric_name", "ignoreme_bucket", int64(0), "23.0") + assertContainsTaggedField(t, acc, "second_metric_name", "ignoreme_bucket", int64(0), "30.0") + assertContainsTaggedField(t, acc, "second_metric_name", "ignoreme_bucket", int64(0), bucketInf) + + assertContainsTaggedField(t, acc, "second_metric_name", "andme_bucket", int64(0), "0.0") + assertContainsTaggedField(t, acc, "second_metric_name", "andme_bucket", int64(0), "4.0") + assertContainsTaggedField(t, acc, "second_metric_name", "andme_bucket", int64(0), "10.0") + assertContainsTaggedField(t, acc, "second_metric_name", "andme_bucket", int64(0), "23.0") + assertContainsTaggedField(t, acc, "second_metric_name", "andme_bucket", int64(0), "30.0") + assertContainsTaggedField(t, acc, "second_metric_name", "andme_bucket", int64(0), bucketInf) } // TestHistogramDifferentPeriodsAndAllFields tests two metrics getting added with a push/reset in between (simulates @@ -154,23 +154,23 @@ func TestHistogramDifferentPeriodsAndAllFields(t *testing.T) { histogram.Add(firstMetric1) histogram.Push(acc) - assertContainsTaggedField(t, acc, "first_metric_name", "a_bucket", uint64(0), "0.0") - assertContainsTaggedField(t, acc, "first_metric_name", "a_bucket", uint64(0), "10.0") - assertContainsTaggedField(t, acc, "first_metric_name", "a_bucket", uint64(1), "20.0") - assertContainsTaggedField(t, acc, "first_metric_name", "a_bucket", uint64(1), "30.0") - assertContainsTaggedField(t, acc, "first_metric_name", "a_bucket", uint64(1), "40.0") - assertContainsTaggedField(t, acc, "first_metric_name", "a_bucket", uint64(1), bucketInf) + assertContainsTaggedField(t, acc, "first_metric_name", "a_bucket", int64(0), "0.0") + assertContainsTaggedField(t, acc, "first_metric_name", "a_bucket", int64(0), "10.0") + assertContainsTaggedField(t, acc, "first_metric_name", "a_bucket", int64(1), "20.0") + assertContainsTaggedField(t, acc, "first_metric_name", "a_bucket", int64(1), "30.0") + assertContainsTaggedField(t, acc, "first_metric_name", "a_bucket", int64(1), "40.0") + assertContainsTaggedField(t, acc, "first_metric_name", "a_bucket", int64(1), bucketInf) acc.ClearMetrics() histogram.Add(firstMetric2) histogram.Push(acc) - assertContainsTaggedField(t, acc, "first_metric_name", "a_bucket", uint64(0), "0.0") - assertContainsTaggedField(t, acc, "first_metric_name", "a_bucket", uint64(0), "10.0") - assertContainsTaggedField(t, acc, "first_metric_name", "a_bucket", uint64(2), "20.0") - assertContainsTaggedField(t, acc, "first_metric_name", "a_bucket", uint64(2), "30.0") - assertContainsTaggedField(t, acc, "first_metric_name", "a_bucket", uint64(2), "40.0") - assertContainsTaggedField(t, acc, "first_metric_name", "a_bucket", uint64(2), bucketInf) + assertContainsTaggedField(t, acc, "first_metric_name", "a_bucket", int64(0), "0.0") + assertContainsTaggedField(t, acc, "first_metric_name", "a_bucket", int64(0), "10.0") + assertContainsTaggedField(t, acc, "first_metric_name", "a_bucket", int64(2), "20.0") + assertContainsTaggedField(t, acc, "first_metric_name", "a_bucket", int64(2), "30.0") + assertContainsTaggedField(t, acc, "first_metric_name", "a_bucket", int64(2), "40.0") + assertContainsTaggedField(t, acc, "first_metric_name", "a_bucket", int64(2), bucketInf) } // TestWrongBucketsOrder tests the calling panic with incorrect order of buckets @@ -192,7 +192,7 @@ func TestWrongBucketsOrder(t *testing.T) { } // assertContainsTaggedField is help functions to test histogram data -func assertContainsTaggedField(t *testing.T, acc *testutil.Accumulator, metricName string, field string, counts uint64, le string) { +func assertContainsTaggedField(t *testing.T, acc *testutil.Accumulator, metricName string, field string, counts int64, le string) { expectedFields := map[string]interface{}{} expectedFields[field] = counts From 30aa7f9fd4f5d482ebef335855a927d7431cb2cf Mon Sep 17 00:00:00 2001 From: vlamug Date: Wed, 15 Feb 2017 17:55:15 +0300 Subject: [PATCH 08/19] Added the additional information about buckets to README.md. --- plugins/aggregators/histogram/README.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/plugins/aggregators/histogram/README.md b/plugins/aggregators/histogram/README.md index 247c128e48737..0898dd959fa7c 100644 --- a/plugins/aggregators/histogram/README.md +++ b/plugins/aggregators/histogram/README.md @@ -16,6 +16,11 @@ The each metric is passed to the aggregator and this aggregator searches histogr have been specified in the config. If buckets are found, the aggregator will put +1 to appropriate bucket. Otherwise, nothing will happen. Every `period` seconds these data will be pushed to output. +Note, that the all hits of current bucket will be also added to all next buckets in final result of distribution. +Why does it work this way? In configuration you define right borders for each bucket in a ascending sequence. +Internally buckets are presented as ranges with borders 0..bucketBorder: 0..1, 0..10, 0..50, …, 0..+Inf. +So the value "+1" will be put into those buckets, in which the metric value fell with such ranges of buckets. + Also, the algorithm of hit counting to buckets was implemented on the base of the algorithm, which is implemented in the Prometheus [client](https://github.com/prometheus/client_golang/blob/master/prometheus/histogram.go). From 2d94051abcc9e3806b3315502195ecbbcb1a18ab Mon Sep 17 00:00:00 2001 From: vlamug Date: Tue, 21 Feb 2017 15:27:51 +0300 Subject: [PATCH 09/19] Added item to Features section in the file CHANGELOG.md. --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index a22beb437080a..4b83570e5a2f4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -191,6 +191,7 @@ be deprecated eventually. - [#2732](https://github.com/influxdata/telegraf/pull/2732): Use go 1.8.1 - [#2712](https://github.com/influxdata/telegraf/issues/2712): Documentation for rabbitmq input plugin - [#2141](https://github.com/influxdata/telegraf/pull/2141): Logparser handles newly-created files. +- [#2387](https://github.com/influxdata/telegraf/pull/2387): Added histogram aggregator plugin. ### Bugfixes From af29572ac54dd27c7bd2d0e91ac3d1ddc0995cf9 Mon Sep 17 00:00:00 2001 From: vlamug Date: Thu, 11 May 2017 11:38:37 +0300 Subject: [PATCH 10/19] Changes by code-review. The refactoring of the "Add" method. The "metric" attribute of structure "metricHistogramCollection" was renamed to "name". The value of var "sampleConfig" was changed. Some changes were added to readme file. --- plugins/aggregators/histogram/README.md | 82 ++++++++++++--------- plugins/aggregators/histogram/histogram.go | 86 ++++++++++++---------- 2 files changed, 93 insertions(+), 75 deletions(-) diff --git a/plugins/aggregators/histogram/README.md b/plugins/aggregators/histogram/README.md index 0898dd959fa7c..b44cc02a22499 100644 --- a/plugins/aggregators/histogram/README.md +++ b/plugins/aggregators/histogram/README.md @@ -6,23 +6,30 @@ This plugin was added for ability to build histograms. #### Description -The histogram aggregator plugin aggregates values of specified metric\`s parameters. The metric is emitted every -`period` seconds. All you need to do is to specify borders of histogram buckets and parameters, for which you want to -aggregate histogram. +The histogram aggregator plugin aggregates values of specified metric's +fields. The metric is emitted every `period` seconds. All you need to do +is to specify borders of histogram buckets and fields, for which you want +to aggregate histogram. #### How it works -The each metric is passed to the aggregator and this aggregator searches histogram buckets for those parameters, which -have been specified in the config. If buckets are found, the aggregator will put +1 to appropriate bucket. -Otherwise, nothing will happen. Every `period` seconds these data will be pushed to output. +The each metric is passed to the aggregator and this aggregator searches +histogram buckets for those fields, which have been specified in the +config. If buckets are found, the aggregator will put +1 to appropriate +bucket. Otherwise, nothing will happen. Every `period` seconds these data +will be pushed to output. -Note, that the all hits of current bucket will be also added to all next buckets in final result of distribution. -Why does it work this way? In configuration you define right borders for each bucket in a ascending sequence. -Internally buckets are presented as ranges with borders 0..bucketBorder: 0..1, 0..10, 0..50, …, 0..+Inf. -So the value "+1" will be put into those buckets, in which the metric value fell with such ranges of buckets. +Note, that the all hits of current bucket will be also added to all next +buckets in final result of distribution. Why does it work this way? In +configuration you define right borders for each bucket in a ascending +sequence. Internally buckets are presented as ranges with borders +(0..bucketBorder]: 0..1, 0..10, 0..50, …, 0..+Inf. So the value "+1" will be +put into those buckets, in which the metric value fell with such ranges of +buckets. -Also, the algorithm of hit counting to buckets was implemented on the base of the algorithm, which is implemented in -the Prometheus [client](https://github.com/prometheus/client_golang/blob/master/prometheus/histogram.go). +Also, the algorithm of hit counting to buckets was implemented on the base +of the algorithm, which is implemented in the Prometheus +[client](https://github.com/prometheus/client_golang/blob/master/prometheus/histogram.go). ### Configuration @@ -38,40 +45,44 @@ the Prometheus [client](https://github.com/prometheus/client_golang/blob/master/ ## The example of config to aggregate histogram for all fields of specified metric. [[aggregators.histogram.config]] - ## The set of buckets. - buckets = [0.0, 15.6, 34.5, 49.1, 71.5, 80.5, 94.5, 100.0] - ## The name of metric. - metric_name = "cpu" + ## The set of buckets. + buckets = [0.0, 15.6, 34.5, 49.1, 71.5, 80.5, 94.5, 100.0] + ## The name of metric. + metric_name = "cpu" ## The example of config to aggregate histogram for concrete fields of specified metric. [[aggregators.histogram.config]] - ## The set of buckets. - buckets = [0.0, 10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0, 80.0, 90.0, 100.0] - ## The name of metric. - metric_name = "diskio" - ## The concrete fields of metric. - metric_fields = ["io_time", "read_time", "write_time"] + ## The set of buckets. + buckets = [0.0, 10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0, 80.0, 90.0, 100.0] + ## The name of metric. + metric_name = "diskio" + ## The concrete fields of metric. + metric_fields = ["io_time", "read_time", "write_time"] ``` #### Explanation -The field `metric_fields` is the list of metric parameters. For example, the metric `cpu` has the following parameters: -usage_user, usage_system, usage_idle, usage_nice, usage_iowait, usage_irq, usage_softirq, usage_steal, usage_guest, -usage_guest_nice. +The field `metric_fields` is the list of metric fields. For example, the +metric `cpu` has the following fields: usage_user, usage_system, +usage_idle, usage_nice, usage_iowait, usage_irq, usage_softirq, usage_steal, +usage_guest, usage_guest_nice. Note that histogram metrics will be pushed every `period` seconds. -As you know telegraf calls aggregator `Reset()` func each `period` seconds. Histogram aggregator ignores `Reset()` and continues to count hits. +As you know telegraf calls aggregator `Reset()` func each `period` seconds. +Histogram aggregator ignores `Reset()` and continues to count hits. #### Use cases -You can specify parameters using two cases: +You can specify fields using two cases: - 1. The specifying only metric name. In this case all parameters of metric will be aggregated. - 2. The specifying metric name and concrete parameters. + 1. The specifying only metric name. In this case all fields of metric + will be aggregated. + 2. The specifying metric name and concrete field. #### Some rules - - The setting of each histogram must be in separate section with title `aggregators.histogram.config`. + - The setting of each histogram must be in separate section with title + `aggregators.histogram.config`. - The each value of bucket must be float value. @@ -79,7 +90,7 @@ You can specify parameters using two cases: ### Measurements & Fields: -The postfix `bucket` will be added to each parameter. +The postfix `bucket` will be added to each field. - measurement1 - field1_bucket @@ -87,10 +98,11 @@ The postfix `bucket` will be added to each parameter. ### Tags: -All measurements have tag `le`. This tag has the border value of bucket. It means that the metric value is less or equal -to the value of this tag. For example, let assume that we have the metric value 10 and the following buckets: -[5, 10, 30, 70, 100]. Then the tag `le` will have the value 10, because the metrics value is passed into bucket with -right border value `10`. +All measurements have tag `le`. This tag has the border value of bucket. It +means that the metric value is less or equal to the value of this tag. For +example, let assume that we have the metric value 10 and the following +buckets: [5, 10, 30, 70, 100]. Then the tag `le` will have the value 10, +because the metrics value is passed into bucket with right border value `10`. ### Example Output: diff --git a/plugins/aggregators/histogram/histogram.go b/plugins/aggregators/histogram/histogram.go index e9ad14aedf0f9..ed61c66a5df52 100644 --- a/plugins/aggregators/histogram/histogram.go +++ b/plugins/aggregators/histogram/histogram.go @@ -42,7 +42,7 @@ type buckets []float64 // metricHistogramCollection aggregates the histogram data type metricHistogramCollection struct { histogramCollection map[string]counts - metric string + name string tags map[string]string } @@ -58,32 +58,29 @@ func NewHistogramAggregator() telegraf.Aggregator { return h } -// sampleConfig is config sample of histogram aggregation plugin var sampleConfig = ` - # # Configuration for aggregate histogram metrics - # [[aggregators.histogram]] - # ## General Aggregator Arguments: - # ## The period on which to flush & clear the aggregator. - # period = "30s" - # ## If true, the original metric will be dropped by the - # ## aggregator and will not get sent to the output plugins. - # drop_original = false - # - # ## The example of config to aggregate histogram for all fields of specified metric. - # [[aggregators.histogram.config]] - # ## The set of buckets. - # buckets = [0.0, 15.6, 34.5, 49.1, 71.5, 80.5, 94.5, 100.0] - # ## The name of metric. - # metric_name = "cpu" - # - # ## The example of config to aggregate for specified fields of metric. - # [[aggregators.histogram.config]] - # ## The set of buckets. - # buckets = [0.0, 10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0, 80.0, 90.0, 100.0] - # ## The name of metric. - # metric_name = "diskio" - # ## The concrete fields of metric - # metric_fields = ["io_time", "read_time", "write_time"] + ## General Aggregator Arguments: + ## The period on which to flush & clear the aggregator. + period = "30s" + ## If true, the original metric will be dropped by the + ## aggregator and will not get sent to the output plugins. + drop_original = false + + ## The example of config to aggregate histogram for all fields of specified metric. + [[aggregators.histogram.config]] + ## The set of buckets. + buckets = [0.0, 15.6, 34.5, 49.1, 71.5, 80.5, 94.5, 100.0] + ## The name of metric. + metric_name = "cpu" + + ## The example of config to aggregate for specified fields of metric. + [[aggregators.histogram.config]] + ## The set of buckets. + buckets = [0.0, 10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0, 80.0, 90.0, 100.0] + ## The name of metric. + metric_name = "diskio" + ## The concrete fields of metric + metric_fields = ["io_time", "read_time", "write_time"] ` // SampleConfig returns sample of config @@ -98,29 +95,38 @@ func (h *HistogramAggregator) Description() string { // Add adds new hit to the buckets func (h *HistogramAggregator) Add(in telegraf.Metric) { + var bucketsByField = make(map[string][]float64) + for field := range in.Fields() { + buckets := h.getBuckets(in.Name(), field) + if buckets != nil { + bucketsByField[field] = buckets + } + } + + if len(bucketsByField) == 0 { + return + } + id := in.HashID() agr, ok := h.cache[id] if !ok { agr = metricHistogramCollection{ - metric: in.Name(), + name: in.Name(), tags: in.Tags(), histogramCollection: make(map[string]counts), } } for field, value := range in.Fields() { - buckets := h.getBuckets(in.Name(), field) - if buckets == nil { - continue - } - - if agr.histogramCollection[field] == nil { - agr.histogramCollection[field] = make(counts, len(buckets)+1) - } + if buckets, ok := bucketsByField[field]; ok { + if agr.histogramCollection[field] == nil { + agr.histogramCollection[field] = make(counts, len(buckets)+1) + } - if value, ok := convert(value); ok { - index := sort.SearchFloat64s(buckets, value) - agr.histogramCollection[field][index]++ + if value, ok := convert(value); ok { + index := sort.SearchFloat64s(buckets, value) + agr.histogramCollection[field][index]++ + } } } @@ -132,7 +138,7 @@ func (h *HistogramAggregator) Push(acc telegraf.Accumulator) { for _, aggregate := range h.cache { for field, counts := range aggregate.histogramCollection { - buckets := h.getBuckets(aggregate.metric, field) + buckets := h.getBuckets(aggregate.name, field) count := int64(0) for index, bucket := range buckets { @@ -206,7 +212,7 @@ func addFields(acc telegraf.Accumulator, agr metricHistogramCollection, field st } tags[bucketTag] = bucketTagVal - acc.AddFields(agr.metric, fields, tags) + acc.AddFields(agr.name, fields, tags) } // checkOrder checks the order of buckets, so that the current value must be more than previous value From 023f3f6abac726431c55b02067158ce8331482bf Mon Sep 17 00:00:00 2001 From: vlamug Date: Thu, 11 May 2017 17:45:42 +0300 Subject: [PATCH 11/19] Changes by code-review. Part 2. Refactoring of the method getBuckets(). Using sortBuckets method to sort buckets, if it is needed. --- plugins/aggregators/histogram/README.md | 4 +++ plugins/aggregators/histogram/histogram.go | 33 +++++++--------------- 2 files changed, 14 insertions(+), 23 deletions(-) diff --git a/plugins/aggregators/histogram/README.md b/plugins/aggregators/histogram/README.md index b44cc02a22499..29b7a6dc0dd79 100644 --- a/plugins/aggregators/histogram/README.md +++ b/plugins/aggregators/histogram/README.md @@ -27,6 +27,10 @@ sequence. Internally buckets are presented as ranges with borders put into those buckets, in which the metric value fell with such ranges of buckets. +This plugin creates cumulative histograms. It means, that the hits in the +buckets will always increase from the moment of telegraf start. But if you +restart telegraf, all hits in the buckets will be reset to 0. + Also, the algorithm of hit counting to buckets was implemented on the base of the algorithm, which is implemented in the Prometheus [client](https://github.com/prometheus/client_golang/blob/master/prometheus/histogram.go). diff --git a/plugins/aggregators/histogram/histogram.go b/plugins/aggregators/histogram/histogram.go index ed61c66a5df52..4821859b6f163 100644 --- a/plugins/aggregators/histogram/histogram.go +++ b/plugins/aggregators/histogram/histogram.go @@ -1,7 +1,6 @@ package histogram import ( - "fmt" "sort" "strconv" @@ -162,7 +161,7 @@ func (h *HistogramAggregator) resetCache() { h.cache = make(map[uint64]metricHistogramCollection) } -// checkAndGetBuckets checks the order of buckets and returns them. +// getBuckets finds buckets and returns them func (h *HistogramAggregator) getBuckets(metric string, field string) []float64 { if buckets, ok := h.buckets[metric][field]; ok { return buckets @@ -178,9 +177,7 @@ func (h *HistogramAggregator) getBuckets(metric string, field string) []float64 h.buckets[metric] = make(bucketsByFields) } - checkOrder(config.Buckets, metric, field) - - h.buckets[metric][field] = config.Buckets + h.buckets[metric][field] = sortBuckets(config.Buckets) } } @@ -204,30 +201,20 @@ func isBucketExists(field string, cfg config) bool { // addFields adds the field with specified tags to accumulator func addFields(acc telegraf.Accumulator, agr metricHistogramCollection, field string, bucketTagVal string, count int64) { - fields := map[string]interface{}{field + "_bucket": count} - - tags := map[string]string{} - for key, val := range agr.tags { - tags[key] = val - } - tags[bucketTag] = bucketTagVal - - acc.AddFields(agr.name, fields, tags) + agr.tags[bucketTag] = bucketTagVal + acc.AddFields(agr.name, map[string]interface{}{field + "_bucket": count}, agr.tags) } -// checkOrder checks the order of buckets, so that the current value must be more than previous value -func checkOrder(buckets []float64, metric string, field string) { +// sortBuckets sorts the buckets if it is needed +func sortBuckets(buckets []float64) []float64 { for i, bucket := range buckets { if i < len(buckets)-1 && bucket >= buckets[i+1] { - panic(fmt.Errorf( - "histogram buckets must be in increasing order: %.2f >= %.2f, metrics: %s, field: %s", - bucket, - buckets[i+1], - metric, - field, - )) + sort.Float64s(buckets) + break } } + + return buckets } // convert converts interface to concrete type From ca40497ff62ae1f1be3c6fd40ddbdff618034910 Mon Sep 17 00:00:00 2001 From: vlamug Date: Thu, 11 May 2017 19:25:28 +0300 Subject: [PATCH 12/19] Using copying of tags in the method: addFields --- plugins/aggregators/histogram/histogram.go | 11 +++++++++-- plugins/aggregators/histogram/histogram_test.go | 10 +++++----- 2 files changed, 14 insertions(+), 7 deletions(-) diff --git a/plugins/aggregators/histogram/histogram.go b/plugins/aggregators/histogram/histogram.go index 4821859b6f163..b0f8b232f51c2 100644 --- a/plugins/aggregators/histogram/histogram.go +++ b/plugins/aggregators/histogram/histogram.go @@ -201,8 +201,15 @@ func isBucketExists(field string, cfg config) bool { // addFields adds the field with specified tags to accumulator func addFields(acc telegraf.Accumulator, agr metricHistogramCollection, field string, bucketTagVal string, count int64) { - agr.tags[bucketTag] = bucketTagVal - acc.AddFields(agr.name, map[string]interface{}{field + "_bucket": count}, agr.tags) + fields := map[string]interface{}{field + "_bucket": count} + + tags := map[string]string{} + for key, val := range agr.tags { + tags[key] = val + } + tags[bucketTag] = bucketTagVal + + acc.AddFields(agr.name, fields, tags) } // sortBuckets sorts the buckets if it is needed diff --git a/plugins/aggregators/histogram/histogram_test.go b/plugins/aggregators/histogram/histogram_test.go index 6a80210752847..96fb2a420a399 100644 --- a/plugins/aggregators/histogram/histogram_test.go +++ b/plugins/aggregators/histogram/histogram_test.go @@ -199,17 +199,17 @@ func assertContainsTaggedField(t *testing.T, acc *testutil.Accumulator, metricNa acc.Lock() defer acc.Unlock() - for _, metric := range acc.Metrics { - if metric.Measurement != metricName { + for _, item := range acc.Metrics { + if item.Measurement != metricName { continue } - if _, ok := metric.Fields[field]; !ok { + if _, ok := item.Fields[field]; !ok { continue } - if metric.Tags[bucketTag] == le { - if assert.Equal(t, expectedFields, metric.Fields) { + if item.Tags[bucketTag] == le { + if assert.Equal(t, expectedFields, item.Fields) { return } From d69f2456d9b3eb6b380fe0b6c0509d6c0a85cc02 Mon Sep 17 00:00:00 2001 From: vlamug Date: Thu, 11 May 2017 19:55:33 +0300 Subject: [PATCH 13/19] Some small changes in tests. --- plugins/aggregators/histogram/histogram_test.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/plugins/aggregators/histogram/histogram_test.go b/plugins/aggregators/histogram/histogram_test.go index 96fb2a420a399..6a80210752847 100644 --- a/plugins/aggregators/histogram/histogram_test.go +++ b/plugins/aggregators/histogram/histogram_test.go @@ -199,17 +199,17 @@ func assertContainsTaggedField(t *testing.T, acc *testutil.Accumulator, metricNa acc.Lock() defer acc.Unlock() - for _, item := range acc.Metrics { - if item.Measurement != metricName { + for _, metric := range acc.Metrics { + if metric.Measurement != metricName { continue } - if _, ok := item.Fields[field]; !ok { + if _, ok := metric.Fields[field]; !ok { continue } - if item.Tags[bucketTag] == le { - if assert.Equal(t, expectedFields, item.Fields) { + if metric.Tags[bucketTag] == le { + if assert.Equal(t, expectedFields, metric.Fields) { return } From d902a12c116b5ab33894676e5b5dddc32e1e962e Mon Sep 17 00:00:00 2001 From: vmugultyanov Date: Fri, 23 Jun 2017 12:25:11 +0300 Subject: [PATCH 14/19] The converting of bucket to string was fixed. Now the precision is fully used. --- plugins/aggregators/histogram/histogram.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/aggregators/histogram/histogram.go b/plugins/aggregators/histogram/histogram.go index b0f8b232f51c2..7276d723f76dd 100644 --- a/plugins/aggregators/histogram/histogram.go +++ b/plugins/aggregators/histogram/histogram.go @@ -142,7 +142,7 @@ func (h *HistogramAggregator) Push(acc telegraf.Accumulator) { for index, bucket := range buckets { count += counts[index] - addFields(acc, aggregate, field, strconv.FormatFloat(bucket, 'f', 1, 64), count) + addFields(acc, aggregate, field, strconv.FormatFloat(bucket, 'f', -1, 64), count) } // the adding a value to the infinitive bucket From 2873efcd7374685e4fcc9af58b6ed65bca610592 Mon Sep 17 00:00:00 2001 From: vlamug Date: Thu, 20 Jul 2017 18:32:00 +0300 Subject: [PATCH 15/19] The grouping metric fields by value was added. --- plugins/aggregators/histogram/histogram.go | 108 ++++++++++++++++----- 1 file changed, 85 insertions(+), 23 deletions(-) diff --git a/plugins/aggregators/histogram/histogram.go b/plugins/aggregators/histogram/histogram.go index 7276d723f76dd..9042644cb3117 100644 --- a/plugins/aggregators/histogram/histogram.go +++ b/plugins/aggregators/histogram/histogram.go @@ -48,6 +48,13 @@ type metricHistogramCollection struct { // counts is the number of hits in the bucket type counts []int64 +type groupedByCountFields struct { + name string + fields []string + tags map[string]string + count int64 +} + // NewHistogramAggregator creates new histogram aggregator func NewHistogramAggregator() telegraf.Aggregator { h := &HistogramAggregator{} @@ -94,7 +101,7 @@ func (h *HistogramAggregator) Description() string { // Add adds new hit to the buckets func (h *HistogramAggregator) Add(in telegraf.Metric) { - var bucketsByField = make(map[string][]float64) + bucketsByField := make(map[string][]float64) for field := range in.Fields() { buckets := h.getBuckets(in.Name(), field) if buckets != nil { @@ -134,22 +141,58 @@ func (h *HistogramAggregator) Add(in telegraf.Metric) { // Push returns histogram values for metrics func (h *HistogramAggregator) Push(acc telegraf.Accumulator) { + metricsWithGroupedFields := []groupedByCountFields{} + for _, aggregate := range h.cache { for field, counts := range aggregate.histogramCollection { + h.groupFieldsByCount(&metricsWithGroupedFields, aggregate.name, field, copyTags(aggregate.tags), counts) + } + } - buckets := h.getBuckets(aggregate.name, field) - count := int64(0) + for _, metric := range metricsWithGroupedFields { + acc.AddFields(metric.name, makeFieldsWithCount(metric.fields, metric.count), metric.tags) + } +} - for index, bucket := range buckets { - count += counts[index] - addFields(acc, aggregate, field, strconv.FormatFloat(bucket, 'f', -1, 64), count) - } +func (h *HistogramAggregator) groupFieldsByCount( + metricsWithGroupedFields *[]groupedByCountFields, + name string, + field string, + tags map[string]string, + counts []int64, +) { + count := int64(0) + for index, bucket := range h.getBuckets(name, field) { + count += counts[index] + + tags[bucketTag] = strconv.FormatFloat(bucket, 'f', -1, 64) + h.groupField(metricsWithGroupedFields, name, field, count, copyTags(tags)) + } - // the adding a value to the infinitive bucket - count += counts[len(counts)-1] - addFields(acc, aggregate, field, bucketInf, count) + count += counts[len(counts)-1] + tags[bucketTag] = bucketInf + + h.groupField(metricsWithGroupedFields, name, field, count, tags) +} + +func (h *HistogramAggregator) groupField( + metricsWithGroupedFields *[]groupedByCountFields, + name string, + field string, + count int64, + tags map[string]string, +) { + for key, metric := range *metricsWithGroupedFields { + if count == metric.count && isTagsIdentical(tags, metric.tags) { + (*metricsWithGroupedFields)[key].fields = append(metric.fields, field) + return } } + + *metricsWithGroupedFields = append( + *metricsWithGroupedFields, + groupedByCountFields{name: name, fields: []string{field}, count: count, tags: tags}, + ) } // Reset does nothing, because we need to collect counts for a long time, otherwise if config parameter 'reset' has @@ -199,19 +242,6 @@ func isBucketExists(field string, cfg config) bool { return false } -// addFields adds the field with specified tags to accumulator -func addFields(acc telegraf.Accumulator, agr metricHistogramCollection, field string, bucketTagVal string, count int64) { - fields := map[string]interface{}{field + "_bucket": count} - - tags := map[string]string{} - for key, val := range agr.tags { - tags[key] = val - } - tags[bucketTag] = bucketTagVal - - acc.AddFields(agr.name, fields, tags) -} - // sortBuckets sorts the buckets if it is needed func sortBuckets(buckets []float64) []float64 { for i, bucket := range buckets { @@ -236,6 +266,38 @@ func convert(in interface{}) (float64, bool) { } } +func copyTags(tags map[string]string) map[string]string { + copiedTags := map[string]string{} + for key, val := range tags { + copiedTags[key] = val + } + + return copiedTags +} + +func isTagsIdentical(originalTags, checkedTags map[string]string) bool { + if len(originalTags) != len(checkedTags) { + return false + } + + for tagName, tagValue := range originalTags { + if tagValue != checkedTags[tagName] { + return false + } + } + + return true +} + +func makeFieldsWithCount(fields []string, count int64) map[string]interface{} { + fieldsWithCount := map[string]interface{}{} + for _, field := range fields { + fieldsWithCount[field+"_bucket"] = count + } + + return fieldsWithCount +} + // init initializes histogram aggregator plugin func init() { aggregators.Add("histogram", func() telegraf.Aggregator { From f4e38674c4eea7b895bc42243bbf5c6e0ee72afc Mon Sep 17 00:00:00 2001 From: vlamug Date: Fri, 21 Jul 2017 11:57:56 +0300 Subject: [PATCH 16/19] The unit test were fixed. --- plugins/aggregators/histogram/histogram.go | 8 +- .../aggregators/histogram/histogram_test.go | 134 +++++++++--------- 2 files changed, 75 insertions(+), 67 deletions(-) diff --git a/plugins/aggregators/histogram/histogram.go b/plugins/aggregators/histogram/histogram.go index 9042644cb3117..bdd266dcacd59 100644 --- a/plugins/aggregators/histogram/histogram.go +++ b/plugins/aggregators/histogram/histogram.go @@ -48,6 +48,7 @@ type metricHistogramCollection struct { // counts is the number of hits in the bucket type counts []int64 +// groupedByCountFields contains grouped fields by their count and fields values type groupedByCountFields struct { name string fields []string @@ -154,6 +155,7 @@ func (h *HistogramAggregator) Push(acc telegraf.Accumulator) { } } +// groupFieldsByCount groups fields by count value func (h *HistogramAggregator) groupFieldsByCount( metricsWithGroupedFields *[]groupedByCountFields, name string, @@ -175,6 +177,7 @@ func (h *HistogramAggregator) groupFieldsByCount( h.groupField(metricsWithGroupedFields, name, field, count, tags) } +// groupField groups field by count value func (h *HistogramAggregator) groupField( metricsWithGroupedFields *[]groupedByCountFields, name string, @@ -183,7 +186,7 @@ func (h *HistogramAggregator) groupField( tags map[string]string, ) { for key, metric := range *metricsWithGroupedFields { - if count == metric.count && isTagsIdentical(tags, metric.tags) { + if name == metric.name && count == metric.count && isTagsIdentical(tags, metric.tags) { (*metricsWithGroupedFields)[key].fields = append(metric.fields, field) return } @@ -266,6 +269,7 @@ func convert(in interface{}) (float64, bool) { } } +// copyTags copies tags func copyTags(tags map[string]string) map[string]string { copiedTags := map[string]string{} for key, val := range tags { @@ -275,6 +279,7 @@ func copyTags(tags map[string]string) map[string]string { return copiedTags } +// isTagsIdentical checks the identity of two list of tags func isTagsIdentical(originalTags, checkedTags map[string]string) bool { if len(originalTags) != len(checkedTags) { return false @@ -289,6 +294,7 @@ func isTagsIdentical(originalTags, checkedTags map[string]string) bool { return true } +// makeFieldsWithCount assigns count value to all metric fields func makeFieldsWithCount(fields []string, count int64) map[string]interface{} { fieldsWithCount := map[string]interface{}{} for _, field := range fields { diff --git a/plugins/aggregators/histogram/histogram_test.go b/plugins/aggregators/histogram/histogram_test.go index 6a80210752847..cd2d73d6a2a89 100644 --- a/plugins/aggregators/histogram/histogram_test.go +++ b/plugins/aggregators/histogram/histogram_test.go @@ -37,6 +37,7 @@ var firstMetric2, _ = metric.New( map[string]string{"tag_name": "tag_value"}, map[string]interface{}{ "a": float64(15.9), + "c": float64(40), }, time.Now(), ) @@ -79,13 +80,12 @@ func TestHistogramWithPeriodAndOneField(t *testing.T) { if len(acc.Metrics) != 6 { assert.Fail(t, "Incorrect number of metrics") } - - assertContainsTaggedField(t, acc, "first_metric_name", "a_bucket", int64(0), "0.0") - assertContainsTaggedField(t, acc, "first_metric_name", "a_bucket", int64(0), "10.0") - assertContainsTaggedField(t, acc, "first_metric_name", "a_bucket", int64(2), "20.0") - assertContainsTaggedField(t, acc, "first_metric_name", "a_bucket", int64(2), "30.0") - assertContainsTaggedField(t, acc, "first_metric_name", "a_bucket", int64(2), "40.0") - assertContainsTaggedField(t, acc, "first_metric_name", "a_bucket", int64(2), bucketInf) + assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(0)}, "0") + assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(0)}, "10") + assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(2)}, "20") + assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(2)}, "30") + assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(2)}, "40") + assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(2)}, bucketInf) } // TestHistogramWithPeriodAndAllFields tests two metrics for one period and for all fields @@ -102,44 +102,30 @@ func TestHistogramWithPeriodAndAllFields(t *testing.T) { histogram.Add(secondMetric) histogram.Push(acc) - if len(acc.Metrics) != 30 { + if len(acc.Metrics) != 18 { assert.Fail(t, "Incorrect number of metrics") } - assertContainsTaggedField(t, acc, "first_metric_name", "a_bucket", int64(0), "0.0") - assertContainsTaggedField(t, acc, "first_metric_name", "a_bucket", int64(1), "15.5") - assertContainsTaggedField(t, acc, "first_metric_name", "a_bucket", int64(2), "20.0") - assertContainsTaggedField(t, acc, "first_metric_name", "a_bucket", int64(2), "30.0") - assertContainsTaggedField(t, acc, "first_metric_name", "a_bucket", int64(2), "40.0") - assertContainsTaggedField(t, acc, "first_metric_name", "a_bucket", int64(2), bucketInf) - - assertContainsTaggedField(t, acc, "first_metric_name", "b_bucket", int64(0), "0.0") - assertContainsTaggedField(t, acc, "first_metric_name", "b_bucket", int64(0), "15.5") - assertContainsTaggedField(t, acc, "first_metric_name", "b_bucket", int64(0), "20.0") - assertContainsTaggedField(t, acc, "first_metric_name", "b_bucket", int64(0), "30.0") - assertContainsTaggedField(t, acc, "first_metric_name", "b_bucket", int64(1), "40.0") - assertContainsTaggedField(t, acc, "first_metric_name", "b_bucket", int64(1), bucketInf) - - assertContainsTaggedField(t, acc, "second_metric_name", "a_bucket", int64(0), "0.0") - assertContainsTaggedField(t, acc, "second_metric_name", "a_bucket", int64(0), "4.0") - assertContainsTaggedField(t, acc, "second_metric_name", "a_bucket", int64(0), "10.0") - assertContainsTaggedField(t, acc, "second_metric_name", "a_bucket", int64(0), "23.0") - assertContainsTaggedField(t, acc, "second_metric_name", "a_bucket", int64(0), "30.0") - assertContainsTaggedField(t, acc, "second_metric_name", "a_bucket", int64(1), bucketInf) - - assertContainsTaggedField(t, acc, "second_metric_name", "ignoreme_bucket", int64(0), "0.0") - assertContainsTaggedField(t, acc, "second_metric_name", "ignoreme_bucket", int64(0), "4.0") - assertContainsTaggedField(t, acc, "second_metric_name", "ignoreme_bucket", int64(0), "10.0") - assertContainsTaggedField(t, acc, "second_metric_name", "ignoreme_bucket", int64(0), "23.0") - assertContainsTaggedField(t, acc, "second_metric_name", "ignoreme_bucket", int64(0), "30.0") - assertContainsTaggedField(t, acc, "second_metric_name", "ignoreme_bucket", int64(0), bucketInf) - - assertContainsTaggedField(t, acc, "second_metric_name", "andme_bucket", int64(0), "0.0") - assertContainsTaggedField(t, acc, "second_metric_name", "andme_bucket", int64(0), "4.0") - assertContainsTaggedField(t, acc, "second_metric_name", "andme_bucket", int64(0), "10.0") - assertContainsTaggedField(t, acc, "second_metric_name", "andme_bucket", int64(0), "23.0") - assertContainsTaggedField(t, acc, "second_metric_name", "andme_bucket", int64(0), "30.0") - assertContainsTaggedField(t, acc, "second_metric_name", "andme_bucket", int64(0), bucketInf) + assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(0), "b_bucket": int64(0), "c_bucket": int64(0)}, "0") + assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(1)}, "15.5") + assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(2)}, "20") + assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(2)}, "30") + assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(2)}, "40") + assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(2)}, bucketInf) + + assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"b_bucket": int64(0), "c_bucket": int64(0)}, "15.5") + assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"b_bucket": int64(0), "c_bucket": int64(0)}, "20") + assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"b_bucket": int64(0), "c_bucket": int64(0)}, "30") + assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"b_bucket": int64(1), "c_bucket": int64(1)}, "40") + assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"b_bucket": int64(1), "c_bucket": int64(1)}, bucketInf) + + assertContainsTaggedField(t, acc, "second_metric_name", map[string]interface{}{"a_bucket": int64(0), "ignoreme_bucket": int64(0), "andme_bucket": int64(0)}, "0") + assertContainsTaggedField(t, acc, "second_metric_name", map[string]interface{}{"a_bucket": int64(0), "ignoreme_bucket": int64(0), "andme_bucket": int64(0)}, "4") + assertContainsTaggedField(t, acc, "second_metric_name", map[string]interface{}{"a_bucket": int64(0), "ignoreme_bucket": int64(0), "andme_bucket": int64(0)}, "10") + assertContainsTaggedField(t, acc, "second_metric_name", map[string]interface{}{"a_bucket": int64(0), "ignoreme_bucket": int64(0), "andme_bucket": int64(0)}, "23") + assertContainsTaggedField(t, acc, "second_metric_name", map[string]interface{}{"a_bucket": int64(0), "ignoreme_bucket": int64(0), "andme_bucket": int64(0)}, "30") + assertContainsTaggedField(t, acc, "second_metric_name", map[string]interface{}{"a_bucket": int64(1)}, bucketInf) + assertContainsTaggedField(t, acc, "second_metric_name", map[string]interface{}{"ignoreme_bucket": int64(0), "andme_bucket": int64(0)}, bucketInf) } // TestHistogramDifferentPeriodsAndAllFields tests two metrics getting added with a push/reset in between (simulates @@ -154,23 +140,29 @@ func TestHistogramDifferentPeriodsAndAllFields(t *testing.T) { histogram.Add(firstMetric1) histogram.Push(acc) - assertContainsTaggedField(t, acc, "first_metric_name", "a_bucket", int64(0), "0.0") - assertContainsTaggedField(t, acc, "first_metric_name", "a_bucket", int64(0), "10.0") - assertContainsTaggedField(t, acc, "first_metric_name", "a_bucket", int64(1), "20.0") - assertContainsTaggedField(t, acc, "first_metric_name", "a_bucket", int64(1), "30.0") - assertContainsTaggedField(t, acc, "first_metric_name", "a_bucket", int64(1), "40.0") - assertContainsTaggedField(t, acc, "first_metric_name", "a_bucket", int64(1), bucketInf) + assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(0), "b_bucket": int64(0)}, "0") + assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(0), "b_bucket": int64(0)}, "10") + assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(1)}, "20") + assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"b_bucket": int64(0)}, "20") + assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(1)}, "30") + assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"b_bucket": int64(0)}, "30") + assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(1), "b_bucket": int64(1)}, "40") + assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(1), "b_bucket": int64(1)}, bucketInf) acc.ClearMetrics() histogram.Add(firstMetric2) histogram.Push(acc) - assertContainsTaggedField(t, acc, "first_metric_name", "a_bucket", int64(0), "0.0") - assertContainsTaggedField(t, acc, "first_metric_name", "a_bucket", int64(0), "10.0") - assertContainsTaggedField(t, acc, "first_metric_name", "a_bucket", int64(2), "20.0") - assertContainsTaggedField(t, acc, "first_metric_name", "a_bucket", int64(2), "30.0") - assertContainsTaggedField(t, acc, "first_metric_name", "a_bucket", int64(2), "40.0") - assertContainsTaggedField(t, acc, "first_metric_name", "a_bucket", int64(2), bucketInf) + assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(0), "b_bucket": int64(0), "c_bucket": int64(0)}, "0") + assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(0), "b_bucket": int64(0), "c_bucket": int64(0)}, "10") + assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(2)}, "20") + assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(2)}, "30") + assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(2)}, "40") + assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(2)}, bucketInf) + assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"b_bucket": int64(0), "c_bucket": int64(0)}, "20") + assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"b_bucket": int64(0), "c_bucket": int64(0)}, "30") + assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"b_bucket": int64(1), "c_bucket": int64(1)}, "40") + assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"b_bucket": int64(1), "c_bucket": int64(1)}, bucketInf) } // TestWrongBucketsOrder tests the calling panic with incorrect order of buckets @@ -192,30 +184,40 @@ func TestWrongBucketsOrder(t *testing.T) { } // assertContainsTaggedField is help functions to test histogram data -func assertContainsTaggedField(t *testing.T, acc *testutil.Accumulator, metricName string, field string, counts int64, le string) { - expectedFields := map[string]interface{}{} - expectedFields[field] = counts - +func assertContainsTaggedField(t *testing.T, acc *testutil.Accumulator, metricName string, fields map[string]interface{}, le string) { acc.Lock() defer acc.Unlock() - for _, metric := range acc.Metrics { - if metric.Measurement != metricName { + for _, checkedMetric := range acc.Metrics { + // check metric name + if checkedMetric.Measurement != metricName { continue } - if _, ok := metric.Fields[field]; !ok { + // check "le" tag + if checkedMetric.Tags[bucketTag] != le { continue } - if metric.Tags[bucketTag] == le { - if assert.Equal(t, expectedFields, metric.Fields) { - return + // check fields + isFieldsIdentical := true + for field := range fields { + if _, ok := checkedMetric.Fields[field]; !ok { + isFieldsIdentical = false + break } + } + if !isFieldsIdentical { + continue + } - assert.Fail(t, fmt.Sprintf("incorrect fields %v of metric %s", expectedFields, metricName)) + // check fields with their counts + if assert.Equal(t, fields, checkedMetric.Fields) { + return } + + assert.Fail(t, fmt.Sprintf("incorrect fields %v of metric %s", fields, metricName)) } - assert.Fail(t, fmt.Sprintf("unknown measurement %s with tags %v", metricName, []string{"tag_name", "le"})) + assert.Fail(t, fmt.Sprintf("unknown measurement '%s' with tags: %v, fields: %v", metricName, map[string]string{"le": le}, fields)) } From 36f356caf073bf42a0caade718e1a523066a736b Mon Sep 17 00:00:00 2001 From: vlamug Date: Fri, 21 Jul 2017 12:18:25 +0300 Subject: [PATCH 17/19] The changelog file was changed. --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4b83570e5a2f4..3c3b2683693e3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -44,6 +44,7 @@ - [#2571](https://github.com/influxdata/telegraf/pull/2571): Add read timeout to socket_listener - [#2612](https://github.com/influxdata/telegraf/pull/2612): Add input plugin for OpenLDAP. - [#3042](https://github.com/influxdata/telegraf/pull/3042): Add network option to dns_query. +- [#2387](https://github.com/influxdata/telegraf/pull/2387): Added histogram aggregator plugin. ### Bugfixes @@ -191,7 +192,6 @@ be deprecated eventually. - [#2732](https://github.com/influxdata/telegraf/pull/2732): Use go 1.8.1 - [#2712](https://github.com/influxdata/telegraf/issues/2712): Documentation for rabbitmq input plugin - [#2141](https://github.com/influxdata/telegraf/pull/2141): Logparser handles newly-created files. -- [#2387](https://github.com/influxdata/telegraf/pull/2387): Added histogram aggregator plugin. ### Bugfixes From e8f4476e0feff5066ebc6515056cd5007e1df5b6 Mon Sep 17 00:00:00 2001 From: Alexander Tunik Date: Wed, 26 Jul 2017 15:12:13 +0300 Subject: [PATCH 18/19] Histogram aggregator now groups metric fields by bucket --- plugins/aggregators/histogram/histogram.go | 35 +++++++++-------- .../aggregators/histogram/histogram_test.go | 39 +++++++------------ 2 files changed, 32 insertions(+), 42 deletions(-) diff --git a/plugins/aggregators/histogram/histogram.go b/plugins/aggregators/histogram/histogram.go index bdd266dcacd59..49195533995c5 100644 --- a/plugins/aggregators/histogram/histogram.go +++ b/plugins/aggregators/histogram/histogram.go @@ -50,10 +50,9 @@ type counts []int64 // groupedByCountFields contains grouped fields by their count and fields values type groupedByCountFields struct { - name string - fields []string - tags map[string]string - count int64 + name string + tags map[string]string + fieldsWithCount map[string]int64 } // NewHistogramAggregator creates new histogram aggregator @@ -146,17 +145,17 @@ func (h *HistogramAggregator) Push(acc telegraf.Accumulator) { for _, aggregate := range h.cache { for field, counts := range aggregate.histogramCollection { - h.groupFieldsByCount(&metricsWithGroupedFields, aggregate.name, field, copyTags(aggregate.tags), counts) + h.groupFieldsByBuckets(&metricsWithGroupedFields, aggregate.name, field, copyTags(aggregate.tags), counts) } } for _, metric := range metricsWithGroupedFields { - acc.AddFields(metric.name, makeFieldsWithCount(metric.fields, metric.count), metric.tags) + acc.AddFields(metric.name, makeFieldsWithCount(metric.fieldsWithCount), metric.tags) } } -// groupFieldsByCount groups fields by count value -func (h *HistogramAggregator) groupFieldsByCount( +// groupFieldsByBuckets groups fields by metric buckets which are represented as tags +func (h *HistogramAggregator) groupFieldsByBuckets( metricsWithGroupedFields *[]groupedByCountFields, name string, field string, @@ -186,15 +185,19 @@ func (h *HistogramAggregator) groupField( tags map[string]string, ) { for key, metric := range *metricsWithGroupedFields { - if name == metric.name && count == metric.count && isTagsIdentical(tags, metric.tags) { - (*metricsWithGroupedFields)[key].fields = append(metric.fields, field) + if name == metric.name && isTagsIdentical(tags, metric.tags) { + (*metricsWithGroupedFields)[key].fieldsWithCount[field] = count return } } + fieldsWithCount := map[string]int64{ + field: count, + } + *metricsWithGroupedFields = append( *metricsWithGroupedFields, - groupedByCountFields{name: name, fields: []string{field}, count: count, tags: tags}, + groupedByCountFields{name: name, tags: tags, fieldsWithCount: fieldsWithCount}, ) } @@ -295,13 +298,13 @@ func isTagsIdentical(originalTags, checkedTags map[string]string) bool { } // makeFieldsWithCount assigns count value to all metric fields -func makeFieldsWithCount(fields []string, count int64) map[string]interface{} { - fieldsWithCount := map[string]interface{}{} - for _, field := range fields { - fieldsWithCount[field+"_bucket"] = count +func makeFieldsWithCount(fieldsWithCountIn map[string]int64) map[string]interface{} { + fieldsWithCountOut := map[string]interface{}{} + for field, count := range fieldsWithCountIn { + fieldsWithCountOut[field+"_bucket"] = count } - return fieldsWithCount + return fieldsWithCountOut } // init initializes histogram aggregator plugin diff --git a/plugins/aggregators/histogram/histogram_test.go b/plugins/aggregators/histogram/histogram_test.go index cd2d73d6a2a89..8c4a2b9d34620 100644 --- a/plugins/aggregators/histogram/histogram_test.go +++ b/plugins/aggregators/histogram/histogram_test.go @@ -102,30 +102,23 @@ func TestHistogramWithPeriodAndAllFields(t *testing.T) { histogram.Add(secondMetric) histogram.Push(acc) - if len(acc.Metrics) != 18 { + if len(acc.Metrics) != 12 { assert.Fail(t, "Incorrect number of metrics") } assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(0), "b_bucket": int64(0), "c_bucket": int64(0)}, "0") - assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(1)}, "15.5") - assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(2)}, "20") - assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(2)}, "30") - assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(2)}, "40") - assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(2)}, bucketInf) - - assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"b_bucket": int64(0), "c_bucket": int64(0)}, "15.5") - assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"b_bucket": int64(0), "c_bucket": int64(0)}, "20") - assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"b_bucket": int64(0), "c_bucket": int64(0)}, "30") - assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"b_bucket": int64(1), "c_bucket": int64(1)}, "40") - assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"b_bucket": int64(1), "c_bucket": int64(1)}, bucketInf) + assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(1), "b_bucket": int64(0), "c_bucket": int64(0)}, "15.5") + assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(2), "b_bucket": int64(0), "c_bucket": int64(0)}, "20") + assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(2), "b_bucket": int64(0), "c_bucket": int64(0)}, "30") + assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(2), "b_bucket": int64(1), "c_bucket": int64(1)}, "40") + assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(2), "b_bucket": int64(1), "c_bucket": int64(1)}, bucketInf) assertContainsTaggedField(t, acc, "second_metric_name", map[string]interface{}{"a_bucket": int64(0), "ignoreme_bucket": int64(0), "andme_bucket": int64(0)}, "0") assertContainsTaggedField(t, acc, "second_metric_name", map[string]interface{}{"a_bucket": int64(0), "ignoreme_bucket": int64(0), "andme_bucket": int64(0)}, "4") assertContainsTaggedField(t, acc, "second_metric_name", map[string]interface{}{"a_bucket": int64(0), "ignoreme_bucket": int64(0), "andme_bucket": int64(0)}, "10") assertContainsTaggedField(t, acc, "second_metric_name", map[string]interface{}{"a_bucket": int64(0), "ignoreme_bucket": int64(0), "andme_bucket": int64(0)}, "23") assertContainsTaggedField(t, acc, "second_metric_name", map[string]interface{}{"a_bucket": int64(0), "ignoreme_bucket": int64(0), "andme_bucket": int64(0)}, "30") - assertContainsTaggedField(t, acc, "second_metric_name", map[string]interface{}{"a_bucket": int64(1)}, bucketInf) - assertContainsTaggedField(t, acc, "second_metric_name", map[string]interface{}{"ignoreme_bucket": int64(0), "andme_bucket": int64(0)}, bucketInf) + assertContainsTaggedField(t, acc, "second_metric_name", map[string]interface{}{"a_bucket": int64(1), "ignoreme_bucket": int64(0), "andme_bucket": int64(0)}, bucketInf) } // TestHistogramDifferentPeriodsAndAllFields tests two metrics getting added with a push/reset in between (simulates @@ -142,10 +135,8 @@ func TestHistogramDifferentPeriodsAndAllFields(t *testing.T) { assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(0), "b_bucket": int64(0)}, "0") assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(0), "b_bucket": int64(0)}, "10") - assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(1)}, "20") - assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"b_bucket": int64(0)}, "20") - assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(1)}, "30") - assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"b_bucket": int64(0)}, "30") + assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(1), "b_bucket": int64(0)}, "20") + assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(1), "b_bucket": int64(0)}, "30") assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(1), "b_bucket": int64(1)}, "40") assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(1), "b_bucket": int64(1)}, bucketInf) @@ -155,14 +146,10 @@ func TestHistogramDifferentPeriodsAndAllFields(t *testing.T) { assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(0), "b_bucket": int64(0), "c_bucket": int64(0)}, "0") assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(0), "b_bucket": int64(0), "c_bucket": int64(0)}, "10") - assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(2)}, "20") - assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(2)}, "30") - assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(2)}, "40") - assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(2)}, bucketInf) - assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"b_bucket": int64(0), "c_bucket": int64(0)}, "20") - assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"b_bucket": int64(0), "c_bucket": int64(0)}, "30") - assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"b_bucket": int64(1), "c_bucket": int64(1)}, "40") - assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"b_bucket": int64(1), "c_bucket": int64(1)}, bucketInf) + assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(2), "b_bucket": int64(0), "c_bucket": int64(0)}, "20") + assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(2), "b_bucket": int64(0), "c_bucket": int64(0)}, "30") + assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(2), "b_bucket": int64(1), "c_bucket": int64(1)}, "40") + assertContainsTaggedField(t, acc, "first_metric_name", map[string]interface{}{"a_bucket": int64(2), "b_bucket": int64(1), "c_bucket": int64(1)}, bucketInf) } // TestWrongBucketsOrder tests the calling panic with incorrect order of buckets From 700c52b10ec9d36860e997dea02beb9cd2ca813a Mon Sep 17 00:00:00 2001 From: vlamug Date: Fri, 28 Jul 2017 11:28:39 +0300 Subject: [PATCH 19/19] The info about histogram plugin was deleted from CHANGELOG file. --- CHANGELOG.md | 1 - 1 file changed, 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3c3b2683693e3..a22beb437080a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -44,7 +44,6 @@ - [#2571](https://github.com/influxdata/telegraf/pull/2571): Add read timeout to socket_listener - [#2612](https://github.com/influxdata/telegraf/pull/2612): Add input plugin for OpenLDAP. - [#3042](https://github.com/influxdata/telegraf/pull/3042): Add network option to dns_query. -- [#2387](https://github.com/influxdata/telegraf/pull/2387): Added histogram aggregator plugin. ### Bugfixes