Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize SeriesGrouper & aggregators.merge #8391

Merged
merged 1 commit into from
Jan 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 48 additions & 23 deletions metric/series_grouper.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
package metric

import (
"hash/fnv"
"io"
"encoding/binary"
"hash/maphash"
"sort"
"strconv"
"time"

"github.com/influxdata/telegraf"
Expand All @@ -23,14 +22,17 @@ import (
// + cpu,host=localhost idle_time=42,usage_time=42
func NewSeriesGrouper() *SeriesGrouper {
return &SeriesGrouper{
metrics: make(map[uint64]telegraf.Metric),
ordered: []telegraf.Metric{},
metrics: make(map[uint64]telegraf.Metric),
ordered: []telegraf.Metric{},
hashSeed: maphash.MakeSeed(),
}
}

type SeriesGrouper struct {
metrics map[uint64]telegraf.Metric
ordered []telegraf.Metric

hashSeed maphash.Seed
}

// Add adds a field key and value to the series.
Expand All @@ -41,8 +43,15 @@ func (g *SeriesGrouper) Add(
field string,
fieldValue interface{},
) error {
taglist := make([]*telegraf.Tag, 0, len(tags))
for k, v := range tags {
taglist = append(taglist,
&telegraf.Tag{Key: k, Value: v})
}
sort.Slice(taglist, func(i, j int) bool { return taglist[i].Key < taglist[j].Key })

var err error
id := groupID(measurement, tags, tm)
id := groupID(g.hashSeed, measurement, taglist, tm)
metric := g.metrics[id]
if metric == nil {
metric, err = New(measurement, tags, map[string]interface{}{field: fieldValue}, tm)
Expand All @@ -57,30 +66,46 @@ func (g *SeriesGrouper) Add(
return nil
}

// AddMetric adds a metric to the series, merging with any previous matching metrics.
func (g *SeriesGrouper) AddMetric(
metric telegraf.Metric,
) {
id := groupID(g.hashSeed, metric.Name(), metric.TagList(), metric.Time())
m := g.metrics[id]
if m == nil {
m = metric.Copy()
g.metrics[id] = m
g.ordered = append(g.ordered, m)
} else {
for _, f := range metric.FieldList() {
m.AddField(f.Key, f.Value)
}
}
}

// Metrics returns the metrics grouped by series and time.
func (g *SeriesGrouper) Metrics() []telegraf.Metric {
return g.ordered
}

func groupID(measurement string, tags map[string]string, tm time.Time) uint64 {
h := fnv.New64a()
h.Write([]byte(measurement))
h.Write([]byte("\n"))
func groupID(seed maphash.Seed, measurement string, taglist []*telegraf.Tag, tm time.Time) uint64 {
var mh maphash.Hash
mh.SetSeed(seed)

mh.WriteString(measurement)
mh.WriteByte(0)

taglist := make([]*telegraf.Tag, 0, len(tags))
for k, v := range tags {
taglist = append(taglist,
&telegraf.Tag{Key: k, Value: v})
}
sort.Slice(taglist, func(i, j int) bool { return taglist[i].Key < taglist[j].Key })
for _, tag := range taglist {
h.Write([]byte(tag.Key))
h.Write([]byte("\n"))
h.Write([]byte(tag.Value))
h.Write([]byte("\n"))
mh.WriteString(tag.Key)
mh.WriteByte(0)
mh.WriteString(tag.Value)
mh.WriteByte(0)
}
h.Write([]byte("\n"))
mh.WriteByte(0)

var tsBuf [8]byte
binary.BigEndian.PutUint64(tsBuf[:], uint64(tm.UnixNano()))
mh.Write(tsBuf[:])

io.WriteString(h, strconv.FormatInt(tm.UnixNano(), 10))
return h.Sum64()
return mh.Sum64()
}
37 changes: 37 additions & 0 deletions metric/series_grouper_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package metric

import (
"hash/maphash"
"testing"
"time"
)

var m, _ = New(
"mymetric",
map[string]string{
"host": "host.example.com",
"mykey": "myvalue",
"another key": "another value",
},
map[string]interface{}{
"f1": 1,
"f2": 2,
"f3": 3,
"f4": 4,
"f5": 5,
"f6": 6,
"f7": 7,
"f8": 8,
},
time.Now(),
)

var result uint64

var hashSeed = maphash.MakeSeed()

func BenchmarkGroupID(b *testing.B) {
for n := 0; n < b.N; n++ {
result = groupID(hashSeed, m.Name(), m.TagList(), m.Time())
}
}
8 changes: 1 addition & 7 deletions plugins/aggregators/merge/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,7 @@ func (a *Merge) SampleConfig() string {
}

func (a *Merge) Add(m telegraf.Metric) {
tags := m.Tags()
for _, field := range m.FieldList() {
err := a.grouper.Add(m.Name(), tags, m.Time(), field.Key, field.Value)
if err != nil {
a.log.Errorf("Error adding metric: %v", err)
}
}
a.grouper.AddMetric(m)
}

func (a *Merge) Push(acc telegraf.Accumulator) {
Expand Down
69 changes: 68 additions & 1 deletion plugins/aggregators/merge/merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ import (
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
)

func TestSimple(t *testing.T) {
Expand Down Expand Up @@ -184,3 +186,68 @@ func TestReset(t *testing.T) {

testutil.RequireMetricsEqual(t, expected, acc.GetTelegrafMetrics())
}

var m1, _ = metric.New(
"mymetric",
map[string]string{
"host": "host.example.com",
"mykey": "myvalue",
"another key": "another value",
},
map[string]interface{}{
"f1": 1,
"f2": 2,
"f3": 3,
"f4": 4,
"f5": 5,
"f6": 6,
"f7": 7,
"f8": 8,
},
time.Now(),
)
var m2, _ = metric.New(
"mymetric",
map[string]string{
"host": "host.example.com",
"mykey": "myvalue",
"another key": "another value",
},
map[string]interface{}{
"f8": 8,
"f9": 9,
"f10": 10,
"f11": 11,
"f12": 12,
"f13": 13,
"f14": 14,
"f15": 15,
"f16": 16,
},
m1.Time(),
)

func BenchmarkMergeOne(b *testing.B) {
var merger Merge
merger.Init()
var acc testutil.NopAccumulator

for n := 0; n < b.N; n++ {
merger.Reset()
merger.Add(m1)
merger.Push(&acc)
}
}

func BenchmarkMergeTwo(b *testing.B) {
var merger Merge
merger.Init()
var acc testutil.NopAccumulator

for n := 0; n < b.N; n++ {
merger.Reset()
merger.Add(m1)
merger.Add(m2)
merger.Push(&acc)
}
}