From 34a3e3685704abf7f1be578b480ffcdcc3385c3b Mon Sep 17 00:00:00 2001 From: Andrew Dunham Date: Tue, 7 May 2019 01:11:12 +0800 Subject: [PATCH] exporter/signalfx: implement basic DistributionData support --- exporter/signalfx/signalfx.go | 61 +++++++++++++++++++++++------------ 1 file changed, 41 insertions(+), 20 deletions(-) diff --git a/exporter/signalfx/signalfx.go b/exporter/signalfx/signalfx.go index 27d0238..002d747 100644 --- a/exporter/signalfx/signalfx.go +++ b/exporter/signalfx/signalfx.go @@ -131,36 +131,57 @@ func (e *Exporter) Flush() { // toMetric receives the view data information and creates metrics that are adequate according to // graphite documentation. -func (e *Exporter) toMetric(v *view.View, row *view.Row, vd *view.Data) signalFxMetric { +func (e *Exporter) toMetric(v *view.View, row *view.Row, vd *view.Data) []signalFxMetric { switch data := row.Data.(type) { case *view.CountData: - return signalFxMetric{ + return []signalFxMetric{{ metricName: sanitize(vd.View.Name), metricType: "cumulative_counter", metricValueInt: data.Value, timestamp: time.Now(), dimensions: buildDimensions(row.Tags), - } + }} case *view.SumData: - return signalFxMetric{ + return []signalFxMetric{{ metricName: sanitize(vd.View.Name), metricType: "cumulative_counter", metricValueInt: int64(data.Value), timestamp: time.Now(), dimensions: buildDimensions(row.Tags), - } + }} case *view.LastValueData: - return signalFxMetric{ + return []signalFxMetric{{ metricName: sanitize(vd.View.Name), metricType: "gauge", metricValue: data.Value, timestamp: time.Now(), dimensions: buildDimensions(row.Tags), + }} + case *view.DistributionData: + subMetrics := map[string]float64{ + "avg": data.Mean, + "count": float64(data.Count), + "max": data.Max, + "min": data.Min, + } + + ret := []signalFxMetric{} + for name, val := range subMetrics { + ret = append(ret, signalFxMetric{ + metricName: sanitize(vd.View.Name + "." + name), + metricType: "gauge", + metricValue: val, + timestamp: time.Now(), + dimensions: buildDimensions(row.Tags), + }) } + + // TODO: add support for counts per bucket, quantiles, etc.? + return ret + default: - // TODO: add support for histograms (Aggregation.DistributionData). e.opts.onError(fmt.Errorf("aggregation %T is not yet supported", data)) - return signalFxMetric{} + return nil } } @@ -190,20 +211,20 @@ func (e *Exporter) sendBundle(vds []*view.Data) { ctx := context.Background() for _, vd := range vds { for _, row := range vd.Rows { - metric := e.toMetric(vd.View, row, vd) + metrics := e.toMetric(vd.View, row, vd) data := []*datapoint.Datapoint{} - switch metric.metricType { - case "gauge": - data = []*datapoint.Datapoint{ - sfxclient.GaugeF(metric.metricName, metric.dimensions, metric.metricValue), - } - case "cumulative_counter": - data = []*datapoint.Datapoint{ - sfxclient.Cumulative(metric.metricName, metric.dimensions, metric.metricValueInt), - } - default: - e.opts.onError(fmt.Errorf("metric type not supported: %s", metric.metricType)) + for _, metric := range metrics { + switch metric.metricType { + case "gauge": + data = append(data, sfxclient.GaugeF(metric.metricName, metric.dimensions, metric.metricValue)) + + case "cumulative_counter": + data = append(data, sfxclient.Cumulative(metric.metricName, metric.dimensions, metric.metricValueInt)) + + default: + e.opts.onError(fmt.Errorf("metric type not supported: %s", metric.metricType)) + } } err := e.client.Sink.AddDatapoints(ctx, data)