From e6c14a596e56b4489d8fa6e08ff7b29c4fd1f07a Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Tue, 15 Oct 2024 11:05:10 +0100 Subject: [PATCH] [exporter/elasticsearch] Log and drop invalid metrics instead of returning error to avoid upstream retries (#35740) #### Description Log metrics validation error instead of returning to avoid upstream retries #### Link to tracking issue #### Testing #### Documentation --- ...exporter_log-metrics-validation-error.yaml | 27 +++++++++++++++++++ exporter/elasticsearchexporter/exporter.go | 24 +++++++++++------ .../elasticsearchexporter/exporter_test.go | 7 +++-- 3 files changed, 46 insertions(+), 12 deletions(-) create mode 100644 .chloggen/elasticsearchexporter_log-metrics-validation-error.yaml diff --git a/.chloggen/elasticsearchexporter_log-metrics-validation-error.yaml b/.chloggen/elasticsearchexporter_log-metrics-validation-error.yaml new file mode 100644 index 000000000000..b9c837287830 --- /dev/null +++ b/.chloggen/elasticsearchexporter_log-metrics-validation-error.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: elasticsearchexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Log and drop invalid metrics instead of returning error to avoid upstream retries + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [35740] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/exporter/elasticsearchexporter/exporter.go b/exporter/elasticsearchexporter/exporter.go index 17d2dc8578f0..b7e00b713cd8 100644 --- a/exporter/elasticsearchexporter/exporter.go +++ b/exporter/elasticsearchexporter/exporter.go @@ -18,6 +18,7 @@ import ( "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" + "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/objmodel" ) @@ -187,7 +188,10 @@ func (e *elasticsearchExporter) pushMetricsData( } defer session.End() - var errs []error + var ( + validationErrs []error // log instead of returning these so that upstream does not retry + errs []error + ) resourceMetrics := metrics.ResourceMetrics() for i := 0; i < resourceMetrics.Len(); i++ { resourceMetric := resourceMetrics.At(i) @@ -224,7 +228,7 @@ func (e *elasticsearchExporter) pushMetricsData( for l := 0; l < dps.Len(); l++ { dp := dps.At(l) if err := upsertDataPoint(newNumberDataPoint(dp)); err != nil { - errs = append(errs, err) + validationErrs = append(validationErrs, err) continue } } @@ -233,33 +237,33 @@ func (e *elasticsearchExporter) pushMetricsData( for l := 0; l < dps.Len(); l++ { dp := dps.At(l) if err := upsertDataPoint(newNumberDataPoint(dp)); err != nil { - errs = append(errs, err) + validationErrs = append(validationErrs, err) continue } } case pmetric.MetricTypeExponentialHistogram: if metric.ExponentialHistogram().AggregationTemporality() == pmetric.AggregationTemporalityCumulative { - errs = append(errs, fmt.Errorf("dropping cumulative temporality exponential histogram %q", metric.Name())) + validationErrs = append(validationErrs, fmt.Errorf("dropping cumulative temporality exponential histogram %q", metric.Name())) continue } dps := metric.ExponentialHistogram().DataPoints() for l := 0; l < dps.Len(); l++ { dp := dps.At(l) if err := upsertDataPoint(newExponentialHistogramDataPoint(dp)); err != nil { - errs = append(errs, err) + validationErrs = append(validationErrs, err) continue } } case pmetric.MetricTypeHistogram: if metric.Histogram().AggregationTemporality() == pmetric.AggregationTemporalityCumulative { - errs = append(errs, fmt.Errorf("dropping cumulative temporality histogram %q", metric.Name())) + validationErrs = append(validationErrs, fmt.Errorf("dropping cumulative temporality histogram %q", metric.Name())) continue } dps := metric.Histogram().DataPoints() for l := 0; l < dps.Len(); l++ { dp := dps.At(l) if err := upsertDataPoint(newHistogramDataPoint(dp)); err != nil { - errs = append(errs, err) + validationErrs = append(validationErrs, err) continue } } @@ -268,7 +272,7 @@ func (e *elasticsearchExporter) pushMetricsData( for l := 0; l < dps.Len(); l++ { dp := dps.At(l) if err := upsertDataPoint(newSummaryDataPoint(dp)); err != nil { - errs = append(errs, err) + validationErrs = append(validationErrs, err) continue } } @@ -276,6 +280,10 @@ func (e *elasticsearchExporter) pushMetricsData( } } + if len(validationErrs) > 0 { + e.Logger.Warn("validation errors", zap.Error(errors.Join(validationErrs...))) + } + for fIndex, docs := range resourceDocs { for _, doc := range docs { var ( diff --git a/exporter/elasticsearchexporter/exporter_test.go b/exporter/elasticsearchexporter/exporter_test.go index a84ef00193a4..06d02e0272c7 100644 --- a/exporter/elasticsearchexporter/exporter_test.go +++ b/exporter/elasticsearchexporter/exporter_test.go @@ -890,7 +890,7 @@ func TestExporterMetrics(t *testing.T) { fooDp.BucketCounts().FromRaw([]uint64{1, 2, 3, 4}) err := exporter.ConsumeMetrics(context.Background(), metrics) - assert.ErrorContains(t, err, "dropping cumulative temporality histogram \"metric.foo\"") + assert.NoError(t, err) }) t.Run("publish exponential histogram cumulative temporality", func(t *testing.T) { @@ -921,7 +921,7 @@ func TestExporterMetrics(t *testing.T) { fooDp.Negative().BucketCounts().FromRaw([]uint64{1, 0, 0, 1}) err := exporter.ConsumeMetrics(context.Background(), metrics) - assert.ErrorContains(t, err, "dropping cumulative temporality exponential histogram \"metric.foo\"") + assert.NoError(t, err) }) t.Run("publish only valid data points", func(t *testing.T) { @@ -960,8 +960,7 @@ func TestExporterMetrics(t *testing.T) { barOtherDp.SetDoubleValue(1.0) err := exporter.ConsumeMetrics(context.Background(), metrics) - require.ErrorContains(t, err, "invalid histogram data point") - require.ErrorContains(t, err, "invalid number data point") + assert.NoError(t, err) rec.WaitItems(2)