Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wavefront/serializer: improve performance by ~30% #5842

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
200 changes: 108 additions & 92 deletions plugins/serializers/wavefront/wavefront.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
package wavefront

import (
"bytes"
"fmt"
"log"
"strconv"
"strings"
"sync"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/outputs/wavefront"
Expand All @@ -16,6 +15,8 @@ type WavefrontSerializer struct {
Prefix string
UseStrict bool
SourceOverride []string
scratch buffer
mu sync.Mutex // buffer mutex
}

// catch many of the invalid chars that could appear in a metric or tag name
Expand Down Expand Up @@ -48,18 +49,16 @@ func NewSerializer(prefix string, useStrict bool, sourceOverride []string) (*Wav
return s, nil
}

// Serialize : Serialize based on Wavefront format
func (s *WavefrontSerializer) Serialize(m telegraf.Metric) ([]byte, error) {
out := []byte{}
metricSeparator := "."
func (s *WavefrontSerializer) serialize(buf *buffer, m telegraf.Metric) {
const metricSeparator = "."

for fieldName, value := range m.Fields() {
var name string

if fieldName == "value" {
name = fmt.Sprintf("%s%s", s.Prefix, m.Name())
name = s.Prefix + m.Name()
} else {
name = fmt.Sprintf("%s%s%s%s", s.Prefix, m.Name(), metricSeparator, fieldName)
name = s.Prefix + m.Name() + metricSeparator + fieldName
}

if s.UseStrict {
Expand All @@ -70,133 +69,150 @@ func (s *WavefrontSerializer) Serialize(m telegraf.Metric) ([]byte, error) {

name = pathReplacer.Replace(name)

metric := &wavefront.MetricPoint{
Metric: name,
Timestamp: m.Time().Unix(),
}

metricValue, buildError := buildValue(value, metric.Metric)
if buildError != nil {
metricValue, valid := buildValue(value, name)
if !valid {
// bad value continue to next metric
continue
}
metric.Value = metricValue

source, tags := buildTags(m.Tags(), s)
metric.Source = source
metric.Tags = tags

out = append(out, formatMetricPoint(metric, s)...)
metric := wavefront.MetricPoint{
Metric: name,
Timestamp: m.Time().Unix(),
Value: metricValue,
Source: source,
Tags: tags,
}
formatMetricPoint(&s.scratch, &metric, s)
}
}

// Serialize : Serialize based on Wavefront format
func (s *WavefrontSerializer) Serialize(m telegraf.Metric) ([]byte, error) {
s.mu.Lock()
s.scratch.Reset()
s.serialize(&s.scratch, m)
out := s.scratch.Copy()
s.mu.Unlock()
return out, nil
}

func (s *WavefrontSerializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) {
var batch bytes.Buffer
s.mu.Lock()
s.scratch.Reset()
for _, m := range metrics {
buf, err := s.Serialize(m)
if err != nil {
return nil, err
}
_, err = batch.Write(buf)
if err != nil {
return nil, err
s.serialize(&s.scratch, m)
}
out := s.scratch.Copy()
s.mu.Unlock()
return out, nil
}

func findSourceTag(mTags map[string]string, s *WavefrontSerializer) string {
if src, ok := mTags["source"]; ok {
delete(mTags, "source")
return src
}
for _, src := range s.SourceOverride {
if source, ok := mTags[src]; ok {
delete(mTags, src)
mTags["telegraf_host"] = mTags["host"]
return source
}
}
return batch.Bytes(), nil
return mTags["host"]
}

func buildTags(mTags map[string]string, s *WavefrontSerializer) (string, map[string]string) {

// Remove all empty tags.
for k, v := range mTags {
if v == "" {
delete(mTags, k)
}
}

var source string

if src, ok := mTags["source"]; ok {
source = src
delete(mTags, "source")
} else {
sourceTagFound := false
for _, src := range s.SourceOverride {
for k, v := range mTags {
if k == src {
source = v
mTags["telegraf_host"] = mTags["host"]
sourceTagFound = true
delete(mTags, k)
break
}
}
if sourceTagFound {
break
}
}

if !sourceTagFound {
source = mTags["host"]
}
}

source := findSourceTag(mTags, s)
delete(mTags, "host")

return tagValueReplacer.Replace(source), mTags
}

func buildValue(v interface{}, name string) (float64, error) {
func buildValue(v interface{}, name string) (val float64, valid bool) {
switch p := v.(type) {
case bool:
if p {
return 1, nil
} else {
return 0, nil
return 1, true
}
return 0, true
case int64:
return float64(v.(int64)), nil
return float64(p), true
case uint64:
return float64(v.(uint64)), nil
return float64(p), true
case float64:
return v.(float64), nil
return p, true
case string:
// return an error but don't log
return 0, fmt.Errorf("string type not supported")
// return false but don't log
return 0, false
default:
// return an error and log a debug message
err := fmt.Errorf("unexpected type: %T, with value: %v, for :%s", v, v, name)
log.Printf("D! Serializer [wavefront] %s\n", err.Error())
return 0, err
// log a debug message
log.Printf("D! Serializer [wavefront] unexpected type: %T, with value: %v, for :%s\n",
v, v, name)
return 0, false
}
}

func formatMetricPoint(metricPoint *wavefront.MetricPoint, s *WavefrontSerializer) []byte {
var buffer bytes.Buffer
buffer.WriteString("\"")
buffer.WriteString(metricPoint.Metric)
buffer.WriteString("\" ")
buffer.WriteString(strconv.FormatFloat(metricPoint.Value, 'f', 6, 64))
buffer.WriteString(" ")
buffer.WriteString(strconv.FormatInt(metricPoint.Timestamp, 10))
buffer.WriteString(" source=\"")
buffer.WriteString(metricPoint.Source)
buffer.WriteString("\"")
func formatMetricPoint(b *buffer, metricPoint *wavefront.MetricPoint, s *WavefrontSerializer) []byte {
b.WriteChar('"')
b.WriteString(metricPoint.Metric)
b.WriteString(`" `)
b.WriteFloat64(metricPoint.Value)
b.WriteChar(' ')
b.WriteUint64(uint64(metricPoint.Timestamp))
b.WriteString(` source="`)
b.WriteString(metricPoint.Source)
b.WriteChar('"')

for k, v := range metricPoint.Tags {
buffer.WriteString(" \"")
b.WriteString(` "`)
if s.UseStrict {
buffer.WriteString(strictSanitizedChars.Replace(k))
b.WriteString(strictSanitizedChars.Replace(k))
} else {
buffer.WriteString(sanitizedChars.Replace(k))
b.WriteString(sanitizedChars.Replace(k))
}
buffer.WriteString("\"=\"")
buffer.WriteString(tagValueReplacer.Replace(v))
buffer.WriteString("\"")
b.WriteString(`"="`)
b.WriteString(tagValueReplacer.Replace(v))
b.WriteChar('"')
}

buffer.WriteString("\n")
b.WriteChar('\n')

return *b
}

type buffer []byte

func (b *buffer) Reset() { *b = (*b)[:0] }

func (b *buffer) Copy() []byte {
p := make([]byte, len(*b))
copy(p, *b)
return p
}

func (b *buffer) WriteString(s string) {
*b = append(*b, s...)
}

// This is named WriteChar instead of WriteByte because the 'stdmethods' check
// of 'go vet' wants WriteByte to have the signature:
//
// func (b *buffer) WriteByte(c byte) error { ... }
//
func (b *buffer) WriteChar(c byte) {
*b = append(*b, c)
}

func (b *buffer) WriteUint64(val uint64) {
*b = strconv.AppendUint(*b, val, 10)
}

return buffer.Bytes()
func (b *buffer) WriteFloat64(val float64) {
*b = strconv.AppendFloat(*b, val, 'f', 6, 64)
}
49 changes: 47 additions & 2 deletions plugins/serializers/wavefront/wavefront_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"testing"
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/outputs/wavefront"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -132,7 +133,7 @@ func TestFormatMetricPoint(t *testing.T) {
s := WavefrontSerializer{}

for _, pt := range pointTests {
bout := formatMetricPoint(pt.ptIn, &s)
bout := formatMetricPoint(new(buffer), pt.ptIn, &s)
sout := string(bout[:])
if sout != pt.out {
t.Errorf("\nexpected\t%s\nreceived\t%s\n", pt.out, sout)
Expand Down Expand Up @@ -160,7 +161,7 @@ func TestUseStrict(t *testing.T) {
s := WavefrontSerializer{UseStrict: true}

for _, pt := range pointTests {
bout := formatMetricPoint(pt.ptIn, &s)
bout := formatMetricPoint(new(buffer), pt.ptIn, &s)
sout := string(bout[:])
if sout != pt.out {
t.Errorf("\nexpected\t%s\nreceived\t%s\n", pt.out, sout)
Expand Down Expand Up @@ -293,3 +294,47 @@ func TestSerializeMetricPrefix(t *testing.T) {
expS := []string{fmt.Sprintf("\"telegraf.cpu.usage.idle\" 91.000000 %d source=\"realHost\" \"cpu\"=\"cpu0\"", now.UnixNano()/1000000000)}
assert.Equal(t, expS, mS)
}

func benchmarkMetrics(b *testing.B) [4]telegraf.Metric {
b.Helper()
now := time.Now()
tags := map[string]string{
"cpu": "cpu0",
"host": "realHost",
}
newMetric := func(v interface{}) telegraf.Metric {
fields := map[string]interface{}{
"usage_idle": v,
}
m, err := metric.New("cpu", tags, fields, now)
if err != nil {
b.Fatal(err)
}
return m
}
return [4]telegraf.Metric{
newMetric(91.5),
newMetric(91),
newMetric(true),
newMetric(false),
}
}

func BenchmarkSerialize(b *testing.B) {
var s WavefrontSerializer
metrics := benchmarkMetrics(b)
b.ResetTimer()
for i := 0; i < b.N; i++ {
s.Serialize(metrics[i%len(metrics)])
}
}

func BenchmarkSerializeBatch(b *testing.B) {
var s WavefrontSerializer
m := benchmarkMetrics(b)
metrics := m[:]
b.ResetTimer()
for i := 0; i < b.N; i++ {
s.SerializeBatch(metrics)
}
}