Skip to content

Commit

Permalink
[exporter/elasticsearch] Log and drop invalid metrics instead of retu…
Browse files Browse the repository at this point in the history
…rning error to avoid upstream retries (#35740)

<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->
#### Description

Log metrics validation error instead of returning to avoid upstream
retries

<!-- Issue number (e.g. #1234) or full URL to issue, if applicable. -->
#### Link to tracking issue

<!--Describe what testing was performed and which tests were added.-->
#### Testing

<!--Describe the documentation added.-->
#### Documentation

<!--Please delete paragraphs that you did not use before submitting.-->
  • Loading branch information
carsonip authored Oct 15, 2024
1 parent fb114a6 commit e6c14a5
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 12 deletions.
27 changes: 27 additions & 0 deletions .chloggen/elasticsearchexporter_log-metrics-validation-error.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: elasticsearchexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Log and drop invalid metrics instead of returning error to avoid upstream retries

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [35740]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
24 changes: 16 additions & 8 deletions exporter/elasticsearchexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/objmodel"
)
Expand Down Expand Up @@ -187,7 +188,10 @@ func (e *elasticsearchExporter) pushMetricsData(
}
defer session.End()

var errs []error
var (
validationErrs []error // log instead of returning these so that upstream does not retry
errs []error
)
resourceMetrics := metrics.ResourceMetrics()
for i := 0; i < resourceMetrics.Len(); i++ {
resourceMetric := resourceMetrics.At(i)
Expand Down Expand Up @@ -224,7 +228,7 @@ func (e *elasticsearchExporter) pushMetricsData(
for l := 0; l < dps.Len(); l++ {
dp := dps.At(l)
if err := upsertDataPoint(newNumberDataPoint(dp)); err != nil {
errs = append(errs, err)
validationErrs = append(validationErrs, err)
continue
}
}
Expand All @@ -233,33 +237,33 @@ func (e *elasticsearchExporter) pushMetricsData(
for l := 0; l < dps.Len(); l++ {
dp := dps.At(l)
if err := upsertDataPoint(newNumberDataPoint(dp)); err != nil {
errs = append(errs, err)
validationErrs = append(validationErrs, err)
continue
}
}
case pmetric.MetricTypeExponentialHistogram:
if metric.ExponentialHistogram().AggregationTemporality() == pmetric.AggregationTemporalityCumulative {
errs = append(errs, fmt.Errorf("dropping cumulative temporality exponential histogram %q", metric.Name()))
validationErrs = append(validationErrs, fmt.Errorf("dropping cumulative temporality exponential histogram %q", metric.Name()))
continue
}
dps := metric.ExponentialHistogram().DataPoints()
for l := 0; l < dps.Len(); l++ {
dp := dps.At(l)
if err := upsertDataPoint(newExponentialHistogramDataPoint(dp)); err != nil {
errs = append(errs, err)
validationErrs = append(validationErrs, err)
continue
}
}
case pmetric.MetricTypeHistogram:
if metric.Histogram().AggregationTemporality() == pmetric.AggregationTemporalityCumulative {
errs = append(errs, fmt.Errorf("dropping cumulative temporality histogram %q", metric.Name()))
validationErrs = append(validationErrs, fmt.Errorf("dropping cumulative temporality histogram %q", metric.Name()))
continue
}
dps := metric.Histogram().DataPoints()
for l := 0; l < dps.Len(); l++ {
dp := dps.At(l)
if err := upsertDataPoint(newHistogramDataPoint(dp)); err != nil {
errs = append(errs, err)
validationErrs = append(validationErrs, err)
continue
}
}
Expand All @@ -268,14 +272,18 @@ func (e *elasticsearchExporter) pushMetricsData(
for l := 0; l < dps.Len(); l++ {
dp := dps.At(l)
if err := upsertDataPoint(newSummaryDataPoint(dp)); err != nil {
errs = append(errs, err)
validationErrs = append(validationErrs, err)
continue
}
}
}
}
}

if len(validationErrs) > 0 {
e.Logger.Warn("validation errors", zap.Error(errors.Join(validationErrs...)))
}

for fIndex, docs := range resourceDocs {
for _, doc := range docs {
var (
Expand Down
7 changes: 3 additions & 4 deletions exporter/elasticsearchexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -890,7 +890,7 @@ func TestExporterMetrics(t *testing.T) {
fooDp.BucketCounts().FromRaw([]uint64{1, 2, 3, 4})

err := exporter.ConsumeMetrics(context.Background(), metrics)
assert.ErrorContains(t, err, "dropping cumulative temporality histogram \"metric.foo\"")
assert.NoError(t, err)
})

t.Run("publish exponential histogram cumulative temporality", func(t *testing.T) {
Expand Down Expand Up @@ -921,7 +921,7 @@ func TestExporterMetrics(t *testing.T) {
fooDp.Negative().BucketCounts().FromRaw([]uint64{1, 0, 0, 1})

err := exporter.ConsumeMetrics(context.Background(), metrics)
assert.ErrorContains(t, err, "dropping cumulative temporality exponential histogram \"metric.foo\"")
assert.NoError(t, err)
})

t.Run("publish only valid data points", func(t *testing.T) {
Expand Down Expand Up @@ -960,8 +960,7 @@ func TestExporterMetrics(t *testing.T) {
barOtherDp.SetDoubleValue(1.0)

err := exporter.ConsumeMetrics(context.Background(), metrics)
require.ErrorContains(t, err, "invalid histogram data point")
require.ErrorContains(t, err, "invalid number data point")
assert.NoError(t, err)

rec.WaitItems(2)

Expand Down

0 comments on commit e6c14a5

Please sign in to comment.