From d37ac30698db594a21f49d8b39fe43ac8c67dc5e Mon Sep 17 00:00:00 2001 From: Dominik Rosiek Date: Thu, 11 Apr 2024 11:48:16 +0200 Subject: [PATCH] feat: move files from Sumo Logic repository which are new Signed-off-by: Dominik Rosiek --- .../sumologicexporter/deduplicate_errors.go | 49 +++++ .../deduplicate_errors_test.go | 58 ++++++ exporter/sumologicexporter/otlp.go | 149 +++++++++++++++ exporter/sumologicexporter/otlp_test.go | 172 ++++++++++++++++++ 4 files changed, 428 insertions(+) create mode 100644 exporter/sumologicexporter/deduplicate_errors.go create mode 100644 exporter/sumologicexporter/deduplicate_errors_test.go create mode 100644 exporter/sumologicexporter/otlp.go create mode 100644 exporter/sumologicexporter/otlp_test.go diff --git a/exporter/sumologicexporter/deduplicate_errors.go b/exporter/sumologicexporter/deduplicate_errors.go new file mode 100644 index 000000000000..3e265ef6d2d8 --- /dev/null +++ b/exporter/sumologicexporter/deduplicate_errors.go @@ -0,0 +1,49 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package sumologicexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/sumologicexporter" + +import "fmt" + +// deduplicateErrors replaces duplicate instances of the same error in a slice +// with a single error containing the number of times it occurred added as a suffix. +// For example, three occurrences of "error: 502 Bad Gateway" +// are replaced with a single instance of "error: 502 Bad Gateway (x3)". +func deduplicateErrors(errs []error) []error { + if len(errs) < 2 { + return errs + } + + errorsWithCounts := []errorWithCount{} + for _, err := range errs { + found := false + for i := range errorsWithCounts { + if errorsWithCounts[i].err.Error() == err.Error() { + found = true + errorsWithCounts[i].count++ + break + } + } + if !found { + errorsWithCounts = append(errorsWithCounts, errorWithCount{ + err: err, + count: 1, + }) + } + } + + var uniqueErrors []error + for _, errorWithCount := range errorsWithCounts { + if errorWithCount.count == 1 { + uniqueErrors = append(uniqueErrors, errorWithCount.err) + } else { + uniqueErrors = append(uniqueErrors, fmt.Errorf("%s (x%d)", errorWithCount.err.Error(), errorWithCount.count)) + } + } + return uniqueErrors +} + +type errorWithCount struct { + err error + count int +} diff --git a/exporter/sumologicexporter/deduplicate_errors_test.go b/exporter/sumologicexporter/deduplicate_errors_test.go new file mode 100644 index 000000000000..5f8a0d3c1f21 --- /dev/null +++ b/exporter/sumologicexporter/deduplicate_errors_test.go @@ -0,0 +1,58 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package sumologicexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/sumologicexporter" + +import ( + "errors" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestDeduplicateErrors(t *testing.T) { + testCases := []struct { + name string + errs []error + expected []error + }{ + { + name: "nil is returned as nil", + errs: nil, + expected: nil, + }, + { + name: "single error is returned as-is", + errs: []error{ + errors.New("Single error"), + }, + expected: []error{ + errors.New("Single error"), + }, + }, + { + name: "duplicates are removed", + errs: []error{ + errors.New("failed sending data: 502 Bad Gateway"), + errors.New("failed sending data: 400 Bad Request"), + errors.New("failed sending data: 502 Bad Gateway"), + errors.New("failed sending data: 400 Bad Request"), + errors.New("failed sending data: 400 Bad Request"), + errors.New("failed sending data: 400 Bad Request"), + errors.New("failed sending data: 504 Gateway Timeout"), + errors.New("failed sending data: 502 Bad Gateway"), + }, + expected: []error{ + errors.New("failed sending data: 502 Bad Gateway (x3)"), + errors.New("failed sending data: 400 Bad Request (x4)"), + errors.New("failed sending data: 504 Gateway Timeout"), + }, + }, + } + + for _, testCase := range testCases { + t.Run(testCase.name, func(t *testing.T) { + assert.Equal(t, testCase.expected, deduplicateErrors(testCase.errs)) + }) + } +} diff --git a/exporter/sumologicexporter/otlp.go b/exporter/sumologicexporter/otlp.go new file mode 100644 index 000000000000..a243011f95a9 --- /dev/null +++ b/exporter/sumologicexporter/otlp.go @@ -0,0 +1,149 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package sumologicexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/sumologicexporter" + +import ( + "math" + + "go.opentelemetry.io/collector/pdata/pmetric" +) + +// decomposeHistograms decomposes any histograms present in the metric data into individual Sums and Gauges +// This is a noop if no Histograms are present, but otherwise makes a copy of the whole structure +// This exists because Sumo doesn't support OTLP histograms yet, and has the same semantics as the conversion to Prometheus format in prometheus_formatter.go +func decomposeHistograms(md pmetric.Metrics) pmetric.Metrics { + // short circuit and do nothing if no Histograms are present + foundHistogram := false +outer: + for i := 0; i < md.ResourceMetrics().Len(); i++ { + resourceMetric := md.ResourceMetrics().At(i) + for j := 0; j < resourceMetric.ScopeMetrics().Len(); j++ { + scopeMetric := resourceMetric.ScopeMetrics().At(j) + for k := 0; k < scopeMetric.Metrics().Len(); k++ { + foundHistogram = scopeMetric.Metrics().At(k).Type() == pmetric.MetricTypeHistogram + if foundHistogram { + break outer + } + } + } + } + if !foundHistogram { + return md + } + + decomposed := pmetric.NewMetrics() + md.CopyTo(decomposed) + + for i := 0; i < decomposed.ResourceMetrics().Len(); i++ { + resourceMetric := decomposed.ResourceMetrics().At(i) + for j := 0; j < resourceMetric.ScopeMetrics().Len(); j++ { + metrics := resourceMetric.ScopeMetrics().At(j).Metrics() + for k := 0; k < metrics.Len(); k++ { + metric := metrics.At(k) + if metric.Type() == pmetric.MetricTypeHistogram { + decomposedHistogram := decomposeHistogram(metric) + decomposedHistogram.MoveAndAppendTo(metrics) + } + } + metrics.RemoveIf(func(m pmetric.Metric) bool { return m.Type() == pmetric.MetricTypeHistogram }) + } + } + + return decomposed +} + +// decomposeHistogram decomposes a single Histogram metric into individual metrics for count, bucket and sum +// non-Histograms give an empty slice as output +func decomposeHistogram(metric pmetric.Metric) pmetric.MetricSlice { + output := pmetric.NewMetricSlice() + if metric.Type() != pmetric.MetricTypeHistogram { + return output + } + + getHistogramSumMetric(metric).MoveTo(output.AppendEmpty()) + getHistogramCountMetric(metric).MoveTo(output.AppendEmpty()) + getHistogramBucketsMetric(metric).MoveTo(output.AppendEmpty()) + + return output +} + +func getHistogramBucketsMetric(metric pmetric.Metric) pmetric.Metric { + histogram := metric.Histogram() + + bucketsMetric := pmetric.NewMetric() + bucketsMetric.SetName(metric.Name() + "_bucket") + bucketsMetric.SetDescription(metric.Description()) + bucketsMetric.SetUnit(metric.Unit()) + bucketsMetric.SetEmptyGauge() + bucketsDatapoints := bucketsMetric.Gauge().DataPoints() + + for i := 0; i < histogram.DataPoints().Len(); i++ { + histogramDataPoint := histogram.DataPoints().At(i) + histogramBounds := histogramDataPoint.ExplicitBounds() + var cumulative uint64 + + for j := 0; j < histogramBounds.Len(); j++ { + bucketDataPoint := bucketsDatapoints.AppendEmpty() + bound := histogramBounds.At(j) + histogramDataPoint.Attributes().CopyTo(bucketDataPoint.Attributes()) + bucketDataPoint.Attributes().PutDouble(prometheusLeTag, bound) + bucketDataPoint.SetStartTimestamp(histogramDataPoint.StartTimestamp()) + bucketDataPoint.SetTimestamp(histogramDataPoint.Timestamp()) + cumulative += histogramDataPoint.BucketCounts().At(j) + bucketDataPoint.SetIntValue(int64(cumulative)) + } + + // need to add one more bucket at +Inf + bucketDataPoint := bucketsDatapoints.AppendEmpty() + histogramDataPoint.Attributes().CopyTo(bucketDataPoint.Attributes()) + bucketDataPoint.Attributes().PutDouble(prometheusLeTag, math.Inf(1)) + bucketDataPoint.SetStartTimestamp(histogramDataPoint.StartTimestamp()) + bucketDataPoint.SetTimestamp(histogramDataPoint.Timestamp()) + cumulative += histogramDataPoint.BucketCounts().At(histogramDataPoint.ExplicitBounds().Len()) + bucketDataPoint.SetIntValue(int64(cumulative)) + } + return bucketsMetric +} + +func getHistogramSumMetric(metric pmetric.Metric) pmetric.Metric { + histogram := metric.Histogram() + + sumMetric := pmetric.NewMetric() + sumMetric.SetName(metric.Name() + "_sum") + sumMetric.SetDescription(metric.Description()) + sumMetric.SetUnit(metric.Unit()) + sumMetric.SetEmptyGauge() + sumDataPoints := sumMetric.Gauge().DataPoints() + + for i := 0; i < histogram.DataPoints().Len(); i++ { + histogramDataPoint := histogram.DataPoints().At(i) + sumDataPoint := sumDataPoints.AppendEmpty() + histogramDataPoint.Attributes().CopyTo(sumDataPoint.Attributes()) + sumDataPoint.SetStartTimestamp(histogramDataPoint.StartTimestamp()) + sumDataPoint.SetTimestamp(histogramDataPoint.Timestamp()) + sumDataPoint.SetDoubleValue(histogramDataPoint.Sum()) + } + return sumMetric +} + +func getHistogramCountMetric(metric pmetric.Metric) pmetric.Metric { + histogram := metric.Histogram() + + countMetric := pmetric.NewMetric() + countMetric.SetName(metric.Name() + "_count") + countMetric.SetDescription(metric.Description()) + countMetric.SetUnit(metric.Unit()) + countMetric.SetEmptyGauge() + countDataPoints := countMetric.Gauge().DataPoints() + + for i := 0; i < histogram.DataPoints().Len(); i++ { + histogramDataPoint := histogram.DataPoints().At(i) + countDataPoint := countDataPoints.AppendEmpty() + histogramDataPoint.Attributes().CopyTo(countDataPoint.Attributes()) + countDataPoint.SetStartTimestamp(histogramDataPoint.StartTimestamp()) + countDataPoint.SetTimestamp(histogramDataPoint.Timestamp()) + countDataPoint.SetIntValue(int64(histogramDataPoint.Count())) + } + return countMetric +} diff --git a/exporter/sumologicexporter/otlp_test.go b/exporter/sumologicexporter/otlp_test.go new file mode 100644 index 000000000000..f80395efdece --- /dev/null +++ b/exporter/sumologicexporter/otlp_test.go @@ -0,0 +1,172 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package sumologicexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/sumologicexporter" + +import ( + "math" + "testing" + + "github.com/stretchr/testify/assert" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" +) + +const ( + timestamp1 = 1618124444.169 * 1e9 + timestamp2 = 1608424699.186 * 1e9 +) + +func TestHistogramDecomposeNoHistogram(t *testing.T) { + mp := exampleIntGaugeMetric() + metric := mp.metric + resourceAttributes := mp.attributes + metrics := pmetric.NewMetrics() + resourceAttributes.CopyTo(metrics.ResourceMetrics().AppendEmpty().Resource().Attributes()) + metric.MoveTo(metrics.ResourceMetrics().At(0).ScopeMetrics().AppendEmpty().Metrics().AppendEmpty()) + decomposedMetrics := decomposeHistograms(metrics) + assert.Equal(t, metrics, decomposedMetrics) +} + +func TestHistogramDecompose(t *testing.T) { + metrics := metricsWithHistogram() + decomposedMetrics := decomposeHistograms(metrics) + assert.Equal(t, metrics.ResourceMetrics().At(0).Resource(), decomposedMetrics.ResourceMetrics().At(0).Resource()) + expectedMetrics := pmetric.NewMetrics() + expectedResourceMetric := expectedMetrics.ResourceMetrics().AppendEmpty() + metrics.ResourceMetrics().At(0).Resource().Attributes().CopyTo(expectedResourceMetric.Resource().Attributes()) + expectedMetricSlice := expectedResourceMetric.ScopeMetrics().AppendEmpty().Metrics() + addExpectedHistogramSum(expectedMetricSlice) + addExpectedHistogramCount(expectedMetricSlice) + addExpectedHistogramBuckets(expectedMetricSlice) + assert.Equal(t, expectedMetrics, decomposedMetrics) +} + +func metricsWithHistogram() pmetric.Metrics { + metrics := pmetric.NewMetrics() + resourceMetric := metrics.ResourceMetrics().AppendEmpty() + resourceMetric.Resource().Attributes().PutStr("key", "value") + scopeMetric := resourceMetric.ScopeMetrics().AppendEmpty() + metric := scopeMetric.Metrics().AppendEmpty() + + metric.SetEmptyHistogram() + metric.SetUnit("unit") + metric.SetName("histogram_metric_double_test") + metric.SetDescription("Test histogram metric") + + dp := metric.Histogram().DataPoints().AppendEmpty() + dp.Attributes().PutStr("container", "dolor") + + si := pcommon.NewUInt64Slice() + si.FromRaw([]uint64{0, 12, 7, 5, 8, 13}) + si.CopyTo(dp.BucketCounts()) + + sf := pcommon.NewFloat64Slice() + sf.FromRaw([]float64{0.1, 0.2, 0.5, 0.8, 1}) + sf.CopyTo(dp.ExplicitBounds()) + + dp.SetTimestamp(timestamp1) + dp.SetSum(45.6) + dp.SetCount(45) + + dp = metric.Histogram().DataPoints().AppendEmpty() + dp.Attributes().PutStr("container", "sit") + + si = pcommon.NewUInt64Slice() + si.FromRaw([]uint64{0, 10, 1, 1, 4, 6}) + si.CopyTo(dp.BucketCounts()) + + sf = pcommon.NewFloat64Slice() + sf.FromRaw([]float64{0.1, 0.2, 0.5, 0.8, 1}) + sf.CopyTo(dp.ExplicitBounds()) + + dp.SetTimestamp(timestamp2) + dp.SetSum(54.1) + dp.SetCount(22) + + return metrics +} + +func addExpectedHistogramSum(metrics pmetric.MetricSlice) { + metric := metrics.AppendEmpty() + metric.SetName("histogram_metric_double_test_sum") + metric.SetDescription("Test histogram metric") + metric.SetUnit("unit") + metric.SetEmptyGauge() + + dataPoint := metric.Gauge().DataPoints().AppendEmpty() + dataPoint.Attributes().PutStr("container", "dolor") + dataPoint.SetTimestamp(timestamp1) + dataPoint.SetDoubleValue(45.6) + + dataPoint = metric.Gauge().DataPoints().AppendEmpty() + dataPoint.Attributes().PutStr("container", "sit") + dataPoint.SetTimestamp(timestamp2) + dataPoint.SetDoubleValue(54.1) +} + +func addExpectedHistogramCount(metrics pmetric.MetricSlice) { + metric := metrics.AppendEmpty() + metric.SetName("histogram_metric_double_test_count") + metric.SetDescription("Test histogram metric") + metric.SetUnit("unit") + metric.SetEmptyGauge() + + dataPoint := metric.Gauge().DataPoints().AppendEmpty() + dataPoint.Attributes().PutStr("container", "dolor") + dataPoint.SetTimestamp(timestamp1) + dataPoint.SetIntValue(45) + + dataPoint = metric.Gauge().DataPoints().AppendEmpty() + dataPoint.Attributes().PutStr("container", "sit") + dataPoint.SetTimestamp(timestamp2) + dataPoint.SetIntValue(22) +} + +func addExpectedHistogramBuckets(metrics pmetric.MetricSlice) { + metric := metrics.AppendEmpty() + metric.SetName("histogram_metric_double_test_bucket") + metric.SetDescription("Test histogram metric") + metric.SetUnit("unit") + metric.SetEmptyGauge() + histogramBuckets := []struct { + float64 + int64 + }{ + {0.1, 0}, + {0.2, 12}, + {0.5, 19}, + {0.8, 24}, + {1, 32}, + {math.Inf(1), 45}, + } + for _, pair := range histogramBuckets { + bound, bucketCount := pair.float64, pair.int64 + dataPoint := metric.Gauge().DataPoints().AppendEmpty() + dataPoint.Attributes().PutStr("container", "dolor") + dataPoint.Attributes().PutDouble(prometheusLeTag, bound) + dataPoint.SetTimestamp(timestamp1) + dataPoint.SetIntValue(bucketCount) + } + + histogramBuckets = []struct { + float64 + int64 + }{ + {0.1, 0}, + {0.2, 10}, + {0.5, 11}, + {0.8, 12}, + {1, 16}, + {math.Inf(1), 22}, + } + for _, pair := range histogramBuckets { + bound, bucketCount := pair.float64, pair.int64 + dataPoint := metric.Gauge().DataPoints().AppendEmpty() + dataPoint.Attributes().PutStr("container", "sit") + dataPoint.Attributes().PutDouble(prometheusLeTag, bound) + dataPoint.SetTimestamp(timestamp2) + dataPoint.SetIntValue(bucketCount) + } + +}