Skip to content

Commit

Permalink
[chore][exporter/sumologic]: add deduplicateErrors and `decomposeHi…
Browse files Browse the repository at this point in the history
…stograms` functions (#32316)

**Description:**

Adds files from Sumo Logic Distribution for OpenTelemetry which are new
in order to simplify review and make simpler to move the whole
component.

This PR is made to simplify #32315

**Link to tracking Issue:**

#31479

**Testing:**

Unit Tests, also this code is use by our customers and internally. It is
not used by now by the component, but will be when migration will be
finished

**Documentation:** N/A

Signed-off-by: Dominik Rosiek <[email protected]>
  • Loading branch information
sumo-drosiek authored Apr 16, 2024
1 parent 8dce114 commit 1b44bce
Show file tree
Hide file tree
Showing 4 changed files with 428 additions and 0 deletions.
49 changes: 49 additions & 0 deletions exporter/sumologicexporter/deduplicate_errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package sumologicexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/sumologicexporter"

import "fmt"

// deduplicateErrors replaces duplicate instances of the same error in a slice
// with a single error containing the number of times it occurred added as a suffix.
// For example, three occurrences of "error: 502 Bad Gateway"
// are replaced with a single instance of "error: 502 Bad Gateway (x3)".
func deduplicateErrors(errs []error) []error {
if len(errs) < 2 {
return errs
}

errorsWithCounts := []errorWithCount{}
for _, err := range errs {
found := false
for i := range errorsWithCounts {
if errorsWithCounts[i].err.Error() == err.Error() {
found = true
errorsWithCounts[i].count++
break
}
}
if !found {
errorsWithCounts = append(errorsWithCounts, errorWithCount{
err: err,
count: 1,
})
}
}

var uniqueErrors []error
for _, errorWithCount := range errorsWithCounts {
if errorWithCount.count == 1 {
uniqueErrors = append(uniqueErrors, errorWithCount.err)
} else {
uniqueErrors = append(uniqueErrors, fmt.Errorf("%s (x%d)", errorWithCount.err.Error(), errorWithCount.count))
}
}
return uniqueErrors
}

type errorWithCount struct {
err error
count int
}
58 changes: 58 additions & 0 deletions exporter/sumologicexporter/deduplicate_errors_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package sumologicexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/sumologicexporter"

import (
"errors"
"testing"

"github.com/stretchr/testify/assert"
)

func TestDeduplicateErrors(t *testing.T) {
testCases := []struct {
name string
errs []error
expected []error
}{
{
name: "nil is returned as nil",
errs: nil,
expected: nil,
},
{
name: "single error is returned as-is",
errs: []error{
errors.New("Single error"),
},
expected: []error{
errors.New("Single error"),
},
},
{
name: "duplicates are removed",
errs: []error{
errors.New("failed sending data: 502 Bad Gateway"),
errors.New("failed sending data: 400 Bad Request"),
errors.New("failed sending data: 502 Bad Gateway"),
errors.New("failed sending data: 400 Bad Request"),
errors.New("failed sending data: 400 Bad Request"),
errors.New("failed sending data: 400 Bad Request"),
errors.New("failed sending data: 504 Gateway Timeout"),
errors.New("failed sending data: 502 Bad Gateway"),
},
expected: []error{
errors.New("failed sending data: 502 Bad Gateway (x3)"),
errors.New("failed sending data: 400 Bad Request (x4)"),
errors.New("failed sending data: 504 Gateway Timeout"),
},
},
}

for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
assert.Equal(t, testCase.expected, deduplicateErrors(testCase.errs))
})
}
}
149 changes: 149 additions & 0 deletions exporter/sumologicexporter/otlp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package sumologicexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/sumologicexporter"

import (
"math"

"go.opentelemetry.io/collector/pdata/pmetric"
)

// decomposeHistograms decomposes any histograms present in the metric data into individual Sums and Gauges
// This is a noop if no Histograms are present, but otherwise makes a copy of the whole structure
// This exists because Sumo doesn't support OTLP histograms yet, and has the same semantics as the conversion to Prometheus format in prometheus_formatter.go
func decomposeHistograms(md pmetric.Metrics) pmetric.Metrics {
// short circuit and do nothing if no Histograms are present
foundHistogram := false
outer:
for i := 0; i < md.ResourceMetrics().Len(); i++ {
resourceMetric := md.ResourceMetrics().At(i)
for j := 0; j < resourceMetric.ScopeMetrics().Len(); j++ {
scopeMetric := resourceMetric.ScopeMetrics().At(j)
for k := 0; k < scopeMetric.Metrics().Len(); k++ {
foundHistogram = scopeMetric.Metrics().At(k).Type() == pmetric.MetricTypeHistogram
if foundHistogram {
break outer
}
}
}
}
if !foundHistogram {
return md
}

decomposed := pmetric.NewMetrics()
md.CopyTo(decomposed)

for i := 0; i < decomposed.ResourceMetrics().Len(); i++ {
resourceMetric := decomposed.ResourceMetrics().At(i)
for j := 0; j < resourceMetric.ScopeMetrics().Len(); j++ {
metrics := resourceMetric.ScopeMetrics().At(j).Metrics()
for k := 0; k < metrics.Len(); k++ {
metric := metrics.At(k)
if metric.Type() == pmetric.MetricTypeHistogram {
decomposedHistogram := decomposeHistogram(metric)
decomposedHistogram.MoveAndAppendTo(metrics)
}
}
metrics.RemoveIf(func(m pmetric.Metric) bool { return m.Type() == pmetric.MetricTypeHistogram })
}
}

return decomposed
}

// decomposeHistogram decomposes a single Histogram metric into individual metrics for count, bucket and sum
// non-Histograms give an empty slice as output
func decomposeHistogram(metric pmetric.Metric) pmetric.MetricSlice {
output := pmetric.NewMetricSlice()
if metric.Type() != pmetric.MetricTypeHistogram {
return output
}

getHistogramSumMetric(metric).MoveTo(output.AppendEmpty())
getHistogramCountMetric(metric).MoveTo(output.AppendEmpty())
getHistogramBucketsMetric(metric).MoveTo(output.AppendEmpty())

return output
}

func getHistogramBucketsMetric(metric pmetric.Metric) pmetric.Metric {
histogram := metric.Histogram()

bucketsMetric := pmetric.NewMetric()
bucketsMetric.SetName(metric.Name() + "_bucket")
bucketsMetric.SetDescription(metric.Description())
bucketsMetric.SetUnit(metric.Unit())
bucketsMetric.SetEmptyGauge()
bucketsDatapoints := bucketsMetric.Gauge().DataPoints()

for i := 0; i < histogram.DataPoints().Len(); i++ {
histogramDataPoint := histogram.DataPoints().At(i)
histogramBounds := histogramDataPoint.ExplicitBounds()
var cumulative uint64

for j := 0; j < histogramBounds.Len(); j++ {
bucketDataPoint := bucketsDatapoints.AppendEmpty()
bound := histogramBounds.At(j)
histogramDataPoint.Attributes().CopyTo(bucketDataPoint.Attributes())
bucketDataPoint.Attributes().PutDouble(prometheusLeTag, bound)
bucketDataPoint.SetStartTimestamp(histogramDataPoint.StartTimestamp())
bucketDataPoint.SetTimestamp(histogramDataPoint.Timestamp())
cumulative += histogramDataPoint.BucketCounts().At(j)
bucketDataPoint.SetIntValue(int64(cumulative))
}

// need to add one more bucket at +Inf
bucketDataPoint := bucketsDatapoints.AppendEmpty()
histogramDataPoint.Attributes().CopyTo(bucketDataPoint.Attributes())
bucketDataPoint.Attributes().PutDouble(prometheusLeTag, math.Inf(1))
bucketDataPoint.SetStartTimestamp(histogramDataPoint.StartTimestamp())
bucketDataPoint.SetTimestamp(histogramDataPoint.Timestamp())
cumulative += histogramDataPoint.BucketCounts().At(histogramDataPoint.ExplicitBounds().Len())
bucketDataPoint.SetIntValue(int64(cumulative))
}
return bucketsMetric
}

func getHistogramSumMetric(metric pmetric.Metric) pmetric.Metric {
histogram := metric.Histogram()

sumMetric := pmetric.NewMetric()
sumMetric.SetName(metric.Name() + "_sum")
sumMetric.SetDescription(metric.Description())
sumMetric.SetUnit(metric.Unit())
sumMetric.SetEmptyGauge()
sumDataPoints := sumMetric.Gauge().DataPoints()

for i := 0; i < histogram.DataPoints().Len(); i++ {
histogramDataPoint := histogram.DataPoints().At(i)
sumDataPoint := sumDataPoints.AppendEmpty()
histogramDataPoint.Attributes().CopyTo(sumDataPoint.Attributes())
sumDataPoint.SetStartTimestamp(histogramDataPoint.StartTimestamp())
sumDataPoint.SetTimestamp(histogramDataPoint.Timestamp())
sumDataPoint.SetDoubleValue(histogramDataPoint.Sum())
}
return sumMetric
}

func getHistogramCountMetric(metric pmetric.Metric) pmetric.Metric {
histogram := metric.Histogram()

countMetric := pmetric.NewMetric()
countMetric.SetName(metric.Name() + "_count")
countMetric.SetDescription(metric.Description())
countMetric.SetUnit(metric.Unit())
countMetric.SetEmptyGauge()
countDataPoints := countMetric.Gauge().DataPoints()

for i := 0; i < histogram.DataPoints().Len(); i++ {
histogramDataPoint := histogram.DataPoints().At(i)
countDataPoint := countDataPoints.AppendEmpty()
histogramDataPoint.Attributes().CopyTo(countDataPoint.Attributes())
countDataPoint.SetStartTimestamp(histogramDataPoint.StartTimestamp())
countDataPoint.SetTimestamp(histogramDataPoint.Timestamp())
countDataPoint.SetIntValue(int64(histogramDataPoint.Count()))
}
return countMetric
}
Loading

0 comments on commit 1b44bce

Please sign in to comment.