Skip to content

Commit

Permalink
Optimize SeriesGrouper & aggregators.merge
Browse files Browse the repository at this point in the history
The previous implementation of SeriesGrouper required breaking a metric object apart into its constituents, converting tags and keys into unoptimized maps, only to have it put them back together into another metric object. This resulted in a significant performance overhead. This overhead was further compounded when the number of fields was large.

This change adds a new AddMetric method to SeriesGrouper which preserves the metric object and removes the back-and-forth conversion.

Additionlly the method used for calculating the metric's hash was switched to use maphash, which is optimized for this case.

----

Benchmarks

Before:

    BenchmarkMergeOne-16          106012	     11790 ns/op
    BenchmarkMergeTwo-16           48529	     24819 ns/op
    BenchmarkGroupID-16           780018	      1608 ns/op

After:

    BenchmarkMergeOne-16          907093	      1173 ns/op
    BenchmarkMergeTwo-16          508321	      2168 ns/op
    BenchmarkGroupID-16         11217788	      99.4 ns/op
  • Loading branch information
phemmer committed Nov 11, 2020
1 parent d369003 commit dc0648c
Show file tree
Hide file tree
Showing 4 changed files with 148 additions and 29 deletions.
66 changes: 45 additions & 21 deletions metric/series_grouper.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
package metric

import (
"hash/fnv"
"io"
"encoding/binary"
"hash/maphash"
"sort"
"strconv"
"time"

"github.com/influxdata/telegraf"
Expand Down Expand Up @@ -41,8 +40,15 @@ func (g *SeriesGrouper) Add(
field string,
fieldValue interface{},
) error {
taglist := make([]*telegraf.Tag, 0, len(tags))
for k, v := range tags {
taglist = append(taglist,
&telegraf.Tag{Key: k, Value: v})
}
sort.Slice(taglist, func(i, j int) bool { return taglist[i].Key < taglist[j].Key })

var err error
id := groupID(measurement, tags, tm)
id := groupID(measurement, taglist, tm)
metric := g.metrics[id]
if metric == nil {
metric, err = New(measurement, tags, map[string]interface{}{field: fieldValue}, tm)
Expand All @@ -57,30 +63,48 @@ func (g *SeriesGrouper) Add(
return nil
}

// AddMetric adds a metric to the series, merging with any previous matching metrics.
func (g *SeriesGrouper) AddMetric(
metric telegraf.Metric,
) {
id := groupID(metric.Name(), metric.TagList(), metric.Time())
m := g.metrics[id]
if m == nil {
m = metric.Copy()
g.metrics[id] = m
g.ordered = append(g.ordered, m)
} else {
for _, f := range metric.FieldList() {
m.AddField(f.Key, f.Value)
}
}
}

// Metrics returns the metrics grouped by series and time.
func (g *SeriesGrouper) Metrics() []telegraf.Metric {
return g.ordered
}

func groupID(measurement string, tags map[string]string, tm time.Time) uint64 {
h := fnv.New64a()
h.Write([]byte(measurement))
h.Write([]byte("\n"))
var seed = maphash.MakeSeed()

func groupID(measurement string, taglist []*telegraf.Tag, tm time.Time) uint64 {
var mh maphash.Hash
mh.SetSeed(seed)

mh.WriteString(measurement)
mh.WriteByte(0)

taglist := make([]*telegraf.Tag, 0, len(tags))
for k, v := range tags {
taglist = append(taglist,
&telegraf.Tag{Key: k, Value: v})
}
sort.Slice(taglist, func(i, j int) bool { return taglist[i].Key < taglist[j].Key })
for _, tag := range taglist {
h.Write([]byte(tag.Key))
h.Write([]byte("\n"))
h.Write([]byte(tag.Value))
h.Write([]byte("\n"))
mh.WriteString(tag.Key)
mh.WriteByte(0)
mh.WriteString(tag.Value)
mh.WriteByte(0)
}
h.Write([]byte("\n"))
mh.WriteByte(0)

var tsBuf [8]byte
binary.BigEndian.PutUint64(tsBuf[:], uint64(tm.UnixNano()))
mh.Write(tsBuf[:])

io.WriteString(h, strconv.FormatInt(tm.UnixNano(), 10))
return h.Sum64()
return mh.Sum64()
}
34 changes: 34 additions & 0 deletions metric/series_grouper_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package metric

import (
"testing"
"time"
)

var m, _ = New(
"mymetric",
map[string]string{
"host": "host.example.com",
"mykey": "myvalue",
"another key": "another value",
},
map[string]interface{}{
"f1": 1,
"f2": 2,
"f3": 3,
"f4": 4,
"f5": 5,
"f6": 6,
"f7": 7,
"f8": 8,
},
time.Now(),
)

var result uint64

func BenchmarkGroupID(b *testing.B) {
for n := 0; n < b.N; n++ {
result = groupID(m.Name(), m.TagList(), m.Time())
}
}
8 changes: 1 addition & 7 deletions plugins/aggregators/merge/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,7 @@ func (a *Merge) SampleConfig() string {
}

func (a *Merge) Add(m telegraf.Metric) {
tags := m.Tags()
for _, field := range m.FieldList() {
err := a.grouper.Add(m.Name(), tags, m.Time(), field.Key, field.Value)
if err != nil {
a.log.Errorf("Error adding metric: %v", err)
}
}
a.grouper.AddMetric(m)
}

func (a *Merge) Push(acc telegraf.Accumulator) {
Expand Down
69 changes: 68 additions & 1 deletion plugins/aggregators/merge/merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ import (
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
)

func TestSimple(t *testing.T) {
Expand Down Expand Up @@ -184,3 +186,68 @@ func TestReset(t *testing.T) {

testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics())
}

var m1, _ = metric.New(
"mymetric",
map[string]string{
"host": "host.example.com",
"mykey": "myvalue",
"another key": "another value",
},
map[string]interface{}{
"f1": 1,
"f2": 2,
"f3": 3,
"f4": 4,
"f5": 5,
"f6": 6,
"f7": 7,
"f8": 8,
},
time.Now(),
)
var m2, _ = metric.New(
"mymetric",
map[string]string{
"host": "host.example.com",
"mykey": "myvalue",
"another key": "another value",
},
map[string]interface{}{
"f8": 8,
"f9": 9,
"f10": 10,
"f11": 11,
"f12": 12,
"f13": 13,
"f14": 14,
"f15": 15,
"f16": 16,
},
time.Now(),
)

func BenchmarkMergeOne(b *testing.B) {
var merger Merge
merger.Init()
var acc testutil.NopAccumulator

for n := 0; n < b.N; n++ {
merger.Reset()
merger.Add(m1)
merger.Push(&acc)
}
}

func BenchmarkMergeTwo(b *testing.B) {
var merger Merge
merger.Init()
var acc testutil.NopAccumulator

for n := 0; n < b.N; n++ {
merger.Reset()
merger.Add(m1)
merger.Add(m2)
merger.Push(&acc)
}
}

0 comments on commit dc0648c

Please sign in to comment.