Skip to content

Commit

Permalink
use buckets by timSeries key
Browse files Browse the repository at this point in the history
  • Loading branch information
Nicolas Bazire committed Feb 14, 2019
1 parent a954546 commit db55582
Showing 1 changed file with 47 additions and 52 deletions.
99 changes: 47 additions & 52 deletions plugins/outputs/stackdriver/stackdriver.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,29 +113,33 @@ func sorted(metrics []telegraf.Metric) []telegraf.Metric {
return batch
}

func getFieldHash(m telegraf.Metric, f *telegraf.Field) uint64 {
type timeSeriesBuckets map[uint64][]*monitoringpb.TimeSeries

func (tsb timeSeriesBuckets) Add(m telegraf.Metric, f *telegraf.Field, ts *monitoringpb.TimeSeries) {
h := fnv.New64a()
h.Write([]byte(m.Name()))
h.Write([]byte("\n"))
h.Write([]byte{'\n'})
h.Write([]byte(f.Key))
h.Write([]byte("\n"))
h.Write([]byte{'\n'})
for key, value := range m.Tags() {
h.Write([]byte(key))
h.Write([]byte("\n"))
h.Write([]byte{'\n'})
h.Write([]byte(value))
h.Write([]byte("\n"))
h.Write([]byte{'\n'})
}
return h.Sum64()
k := h.Sum64()

s := tsb[k]
s = append(s, ts)
tsb[k] = s
}

// Write the metrics to Google Cloud Stackdriver.
func (s *Stackdriver) Write(metrics []telegraf.Metric) error {
ctx := context.Background()

batch := sorted(metrics)
timeSeries := []*monitoringpb.TimeSeries{}
processedTimeSeries := map[uint64]bool{}

buckets := make(timeSeriesBuckets)
for _, m := range batch {
for _, f := range m.FieldList() {
value, err := getStackdriverTypedValue(f.Value)
Expand All @@ -148,32 +152,6 @@ func (s *Stackdriver) Write(metrics []telegraf.Metric) error {
continue
}

fieldHash := getFieldHash(m, f)
_, duplicateInRequest := processedTimeSeries[fieldHash]

if duplicateInRequest || len(timeSeries) >= 200 {
// Prepare time series request.
timeSeriesRequest := &monitoringpb.CreateTimeSeriesRequest{
Name: monitoring.MetricProjectPath(s.Project),
TimeSeries: timeSeries,
}

// Create the time series in Stackdriver.
err := s.client.CreateTimeSeries(ctx, timeSeriesRequest)
if err != nil {
if strings.Contains(err.Error(), errStringPointsOutOfOrder) ||
strings.Contains(err.Error(), errStringPointsTooOld) ||
strings.Contains(err.Error(), errStringPointsTooFrequent) {
log.Printf("D! [outputs.stackdriver] unable to write to Stackdriver: %s", err)
return nil
}
log.Printf("E! [outputs.stackdriver] unable to write to Stackdriver: %s", err)
return err
}
timeSeries = []*monitoringpb.TimeSeries{}
processedTimeSeries = map[uint64]bool{}
}

metricKind, err := getStackdriverMetricKind(m.Type())
if err != nil {
log.Printf("E! [outputs.stackdriver] get metric failed: %s", err)
Expand All @@ -193,26 +171,43 @@ func (s *Stackdriver) Write(metrics []telegraf.Metric) error {
}

// Prepare time series.
timeSeries = append(timeSeries,
&monitoringpb.TimeSeries{
Metric: &metricpb.Metric{
Type: path.Join("custom.googleapis.com", s.Namespace, m.Name(), f.Key),
Labels: getStackdriverLabels(m.TagList()),
},
MetricKind: metricKind,
Resource: &monitoredrespb.MonitoredResource{
Type: s.ResourceType,
Labels: s.ResourceLabels,
},
Points: []*monitoringpb.Point{
dataPoint,
},
})
processedTimeSeries[fieldHash] = true
timeSeries := &monitoringpb.TimeSeries{
Metric: &metricpb.Metric{
Type: path.Join("custom.googleapis.com", s.Namespace, m.Name(), f.Key),
Labels: getStackdriverLabels(m.TagList()),
},
MetricKind: metricKind,
Resource: &monitoredrespb.MonitoredResource{
Type: s.ResourceType,
Labels: s.ResourceLabels,
},
Points: []*monitoringpb.Point{
dataPoint,
},
}

buckets.Add(m, f, timeSeries)
}
}

if len(timeSeries) > 0 {
for len(buckets) != 0 {
// can send up to 200 time series to stackdriver
timeSeries := make([]*monitoringpb.TimeSeries, 0, 200)
for k, s := range buckets {
timeSeries = append(timeSeries, s[0])
if len(s) == 1 {
delete(buckets, k)
continue
}

s = s[1:]
buckets[k] = s

if len(timeSeries) == cap(timeSeries) {
break
}
}

// Prepare time series request.
timeSeriesRequest := &monitoringpb.CreateTimeSeriesRequest{
Name: monitoring.MetricProjectPath(s.Project),
Expand Down

0 comments on commit db55582

Please sign in to comment.