Skip to content

Commit

Permalink
Group stackdriver requests to send one point per timeseries (influxda…
Browse files Browse the repository at this point in the history
  • Loading branch information
Legogris authored and Mathieu Lecarme committed Apr 17, 2020
1 parent 009bd61 commit 287a975
Show file tree
Hide file tree
Showing 2 changed files with 148 additions and 20 deletions.
85 changes: 65 additions & 20 deletions plugins/outputs/stackdriver/stackdriver.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package stackdriver
import (
"context"
"fmt"
"hash/fnv"
"log"
"path"
"sort"
Expand Down Expand Up @@ -111,15 +112,34 @@ func sorted(metrics []telegraf.Metric) []telegraf.Metric {
return batch
}

type timeSeriesBuckets map[uint64][]*monitoringpb.TimeSeries

func (tsb timeSeriesBuckets) Add(m telegraf.Metric, f *telegraf.Field, ts *monitoringpb.TimeSeries) {
h := fnv.New64a()
h.Write([]byte(m.Name()))
h.Write([]byte{'\n'})
h.Write([]byte(f.Key))
h.Write([]byte{'\n'})
for key, value := range m.Tags() {
h.Write([]byte(key))
h.Write([]byte{'\n'})
h.Write([]byte(value))
h.Write([]byte{'\n'})
}
k := h.Sum64()

s := tsb[k]
s = append(s, ts)
tsb[k] = s
}

// Write the metrics to Google Cloud Stackdriver.
func (s *Stackdriver) Write(metrics []telegraf.Metric) error {
ctx := context.Background()

batch := sorted(metrics)

buckets := make(timeSeriesBuckets)
for _, m := range batch {
timeSeries := []*monitoringpb.TimeSeries{}

for _, f := range m.FieldList() {
value, err := getStackdriverTypedValue(f.Value)
if err != nil {
Expand Down Expand Up @@ -150,25 +170,50 @@ func (s *Stackdriver) Write(metrics []telegraf.Metric) error {
}

// Prepare time series.
timeSeries = append(timeSeries,
&monitoringpb.TimeSeries{
Metric: &metricpb.Metric{
Type: path.Join("custom.googleapis.com", s.Namespace, m.Name(), f.Key),
Labels: getStackdriverLabels(m.TagList()),
},
MetricKind: metricKind,
Resource: &monitoredrespb.MonitoredResource{
Type: s.ResourceType,
Labels: s.ResourceLabels,
},
Points: []*monitoringpb.Point{
dataPoint,
},
})
timeSeries := &monitoringpb.TimeSeries{
Metric: &metricpb.Metric{
Type: path.Join("custom.googleapis.com", s.Namespace, m.Name(), f.Key),
Labels: getStackdriverLabels(m.TagList()),
},
MetricKind: metricKind,
Resource: &monitoredrespb.MonitoredResource{
Type: s.ResourceType,
Labels: s.ResourceLabels,
},
Points: []*monitoringpb.Point{
dataPoint,
},
}

buckets.Add(m, f, timeSeries)
}
}

if len(timeSeries) < 1 {
continue
// process the buckets in order
keys := make([]uint64, 0, len(buckets))
for k := range buckets {
keys = append(keys, k)
}
sort.Slice(keys, func(i, j int) bool { return keys[i] < keys[j] })

for len(buckets) != 0 {
// can send up to 200 time series to stackdriver
timeSeries := make([]*monitoringpb.TimeSeries, 0, 200)
for i, k := range keys {
s := buckets[k]
timeSeries = append(timeSeries, s[0])
if len(s) == 1 {
delete(buckets, k)
keys = append(keys[:i], keys[i+1:]...)
continue
}

s = s[1:]
buckets[k] = s

if len(timeSeries) == cap(timeSeries) {
break
}
}

// Prepare time series request.
Expand Down
83 changes: 83 additions & 0 deletions plugins/outputs/stackdriver/stackdriver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,89 @@ func TestWriteAscendingTime(t *testing.T) {
})
}

func TestWriteBatchable(t *testing.T) {
expectedResponse := &emptypb.Empty{}
mockMetric.err = nil
mockMetric.reqs = nil
mockMetric.resps = append(mockMetric.resps[:0], expectedResponse)

c, err := monitoring.NewMetricClient(context.Background(), clientOpt)
if err != nil {
t.Fatal(err)
}

s := &Stackdriver{
Project: fmt.Sprintf("projects/%s", "[PROJECT]"),
Namespace: "test",
client: c,
}

// Metrics in descending order of timestamp
metrics := []telegraf.Metric{
testutil.MustMetric("cpu",
map[string]string{
"foo": "bar",
},
map[string]interface{}{
"value": 42,
},
time.Unix(2, 0),
),
testutil.MustMetric("cpu",
map[string]string{
"foo": "foo",
},
map[string]interface{}{
"value": 43,
},
time.Unix(3, 0),
),
testutil.MustMetric("cpu",
map[string]string{
"foo": "bar",
},
map[string]interface{}{
"value": 43,
},
time.Unix(1, 0),
),
}

err = s.Connect()
require.NoError(t, err)
err = s.Write(metrics)
require.NoError(t, err)

require.Len(t, mockMetric.reqs, 2)
request := mockMetric.reqs[0].(*monitoringpb.CreateTimeSeriesRequest)
require.Len(t, request.TimeSeries, 2)
ts := request.TimeSeries[0]
require.Len(t, ts.Points, 1)
require.Equal(t, ts.Points[0].Interval, &monitoringpb.TimeInterval{
EndTime: &googlepb.Timestamp{
Seconds: 3,
},
})
require.Equal(t, ts.Points[0].Value, &monitoringpb.TypedValue{
Value: &monitoringpb.TypedValue_Int64Value{
Int64Value: int64(43),
},
})

ts = request.TimeSeries[1]
require.Len(t, ts.Points, 1)
require.Equal(t, ts.Points[0].Interval, &monitoringpb.TimeInterval{
EndTime: &googlepb.Timestamp{
Seconds: 1,
},
})
require.Equal(t, ts.Points[0].Value, &monitoringpb.TypedValue{
Value: &monitoringpb.TypedValue_Int64Value{
Int64Value: int64(43),
},
})
}

func TestWriteIgnoredErrors(t *testing.T) {
tests := []struct {
name string
Expand Down

0 comments on commit 287a975

Please sign in to comment.