diff --git a/exporters/otlp/otlplog/otlploghttp/client.go b/exporters/otlp/otlplog/otlploghttp/client.go index 1539bec7b901..8a11309be834 100644 --- a/exporters/otlp/otlplog/otlploghttp/client.go +++ b/exporters/otlp/otlplog/otlploghttp/client.go @@ -149,10 +149,15 @@ func (c *httpClient) uploadLogs(ctx context.Context, data []*logpb.ResourceLogs) if err != nil { return err } + if resp != nil && resp.Body != nil { + defer func() { + if err := resp.Body.Close(); err != nil { + otel.Handle(err) + } + }() + } - var rErr error - switch sc := resp.StatusCode; { - case sc >= 200 && sc <= 299: + if sc := resp.StatusCode; sc >= 200 && sc <= 299 { // Success, do not retry. // Read the partial success message, if any. @@ -180,38 +185,41 @@ func (c *httpClient) uploadLogs(ctx context.Context, data []*logpb.ResourceLogs) } } return nil - case sc == http.StatusTooManyRequests, - sc == http.StatusBadGateway, - sc == http.StatusServiceUnavailable, - sc == http.StatusGatewayTimeout: - // Retry-able failure. - rErr = newResponseError(resp.Header, nil) - - // server may return a message with the response - // body, so we read it to include in the error - // message to be returned. It will help in - // debugging the actual issue. - var respData bytes.Buffer - if _, err := io.Copy(&respData, resp.Body); err != nil { - _ = resp.Body.Close() - return err - } - - // overwrite the error message with the response body - // if it is not empty - if respStr := strings.TrimSpace(respData.String()); respStr != "" { - // Include response for context. - e := errors.New(respStr) - rErr = newResponseError(resp.Header, e) - } - default: - rErr = fmt.Errorf("failed to send logs to %s: %s", request.URL, resp.Status) } - - if err := resp.Body.Close(); err != nil { + // Error cases. + + // server may return a message with the response + // body, so we read it to include in the error + // message to be returned. It will help in + // debugging the actual issue. + var respData bytes.Buffer + if _, err := io.Copy(&respData, resp.Body); err != nil { return err } - return rErr + respStr := strings.TrimSpace(respData.String()) + + switch resp.StatusCode { + case http.StatusTooManyRequests, + http.StatusBadGateway, + http.StatusServiceUnavailable, + http.StatusGatewayTimeout: + // Retryable failure. + + var err error + if len(respStr) > 0 { + // include response body for context + err = errors.New(respStr) + } + return newResponseError(resp.Header, err) + default: + // Non-retryable failure. + if len(respStr) > 0 { + // include response body for context + err = errors.New(respStr) + return fmt.Errorf("failed to send logs to %s: %s (%w)", request.URL, resp.Status, err) + } + return fmt.Errorf("failed to send logs to %s: %s", request.URL, resp.Status) + } }) } diff --git a/exporters/otlp/otlplog/otlploghttp/client_test.go b/exporters/otlp/otlplog/otlploghttp/client_test.go index 8b9eb945ea9d..a181587e207f 100644 --- a/exporters/otlp/otlplog/otlploghttp/client_test.go +++ b/exporters/otlp/otlplog/otlploghttp/client_test.go @@ -779,3 +779,39 @@ func TestConfig(t *testing.T) { assert.Equal(t, []string{headerValueSetInProxy}, got[headerKeySetInProxy]) }) } + +// borrows from TestConfig +func TestNonRetryable(t *testing.T) { + factoryFunc := func(ePt string, rCh <-chan exportResult, o ...Option) (log.Exporter, *httpCollector) { + coll, err := newHTTPCollector(ePt, rCh) + require.NoError(t, err) + + opts := []Option{WithEndpoint(coll.Addr().String())} + if !strings.HasPrefix(strings.ToLower(ePt), "https") { + opts = append(opts, WithInsecure()) + } + opts = append(opts, o...) + + ctx := context.Background() + exp, err := New(ctx, opts...) + require.NoError(t, err) + return exp, coll + } + exporterErr := errors.New("missing required attribute aaaa") + rCh := make(chan exportResult, 1) + rCh <- exportResult{Err: &httpResponseError{ + Status: http.StatusBadRequest, + Err: exporterErr, + }} + + exp, coll := factoryFunc("", rCh, WithRetry(RetryConfig{ + Enabled: false, + })) + ctx := context.Background() + t.Cleanup(func() { require.NoError(t, coll.Shutdown(ctx)) }) + // Push this after Shutdown so the HTTP server doesn't hang. + t.Cleanup(func() { close(rCh) }) + t.Cleanup(func() { require.NoError(t, exp.Shutdown(ctx)) }) + err := exp.Export(ctx, make([]log.Record, 1)) + assert.ErrorContains(t, err, exporterErr.Error()) +} diff --git a/exporters/otlp/otlpmetric/otlpmetrichttp/client.go b/exporters/otlp/otlpmetric/otlpmetrichttp/client.go index 7ef295e59e36..7765ec36cf80 100644 --- a/exporters/otlp/otlpmetric/otlpmetrichttp/client.go +++ b/exporters/otlp/otlpmetric/otlpmetrichttp/client.go @@ -152,10 +152,15 @@ func (c *client) UploadMetrics(ctx context.Context, protoMetrics *metricpb.Resou if err != nil { return err } + if resp != nil && resp.Body != nil { + defer func() { + if err := resp.Body.Close(); err != nil { + otel.Handle(err) + } + }() + } - var rErr error - switch sc := resp.StatusCode; { - case sc >= 200 && sc <= 299: + if sc := resp.StatusCode; sc >= 200 && sc <= 299 { // Success, do not retry. // Read the partial success message, if any. @@ -183,38 +188,42 @@ func (c *client) UploadMetrics(ctx context.Context, protoMetrics *metricpb.Resou } } return nil - case sc == http.StatusTooManyRequests, - sc == http.StatusBadGateway, - sc == http.StatusServiceUnavailable, - sc == http.StatusGatewayTimeout: - // Retry-able failure. - rErr = newResponseError(resp.Header, nil) - - // server may return a message with the response - // body, so we read it to include in the error - // message to be returned. It will help in - // debugging the actual issue. - var respData bytes.Buffer - if _, err := io.Copy(&respData, resp.Body); err != nil { - _ = resp.Body.Close() - return err + } + // Error cases. + + // server may return a message with the response + // body, so we read it to include in the error + // message to be returned. It will help in + // debugging the actual issue. + var respData bytes.Buffer + if _, err := io.Copy(&respData, resp.Body); err != nil { + return err + } + respStr := strings.TrimSpace(respData.String()) + + switch resp.StatusCode { + case http.StatusTooManyRequests, + http.StatusBadGateway, + http.StatusServiceUnavailable, + http.StatusGatewayTimeout: + // Retryable failure. + + var err error + if len(respStr) > 0 { + // include response body for context + err = errors.New(respStr) } + return newResponseError(resp.Header, err) + default: + // Non-retryable failure. - // overwrite the error message with the response body - // if it is not empty - if respStr := strings.TrimSpace(respData.String()); respStr != "" { - // Include response for context. + if len(respStr) > 0 { + // include response body for context e := errors.New(respStr) - rErr = newResponseError(resp.Header, e) + return fmt.Errorf("failed to send metrics to %s: %s (%w)", request.URL, resp.Status, e) } - default: - rErr = fmt.Errorf("failed to send metrics to %s: %s", request.URL, resp.Status) - } - - if err := resp.Body.Close(); err != nil { - return err + return fmt.Errorf("failed to send metrics to %s: %s", request.URL, resp.Status) } - return rErr }) } diff --git a/exporters/otlp/otlpmetric/otlpmetrichttp/client_test.go b/exporters/otlp/otlpmetric/otlpmetrichttp/client_test.go index 3f65e6fb5396..48804f5e90f1 100644 --- a/exporters/otlp/otlpmetric/otlpmetrichttp/client_test.go +++ b/exporters/otlp/otlpmetric/otlpmetrichttp/client_test.go @@ -271,3 +271,40 @@ func TestConfig(t *testing.T) { assert.Equal(t, []string{headerValueSetInProxy}, got[headerKeySetInProxy]) }) } + +// borrows from TestConfig +func TestNonRetryable(t *testing.T) { + factoryFunc := func(ePt string, rCh <-chan otest.ExportResult, o ...Option) (metric.Exporter, *otest.HTTPCollector) { + coll, err := otest.NewHTTPCollector(ePt, rCh) + require.NoError(t, err) + + opts := []Option{WithEndpoint(coll.Addr().String())} + if !strings.HasPrefix(strings.ToLower(ePt), "https") { + opts = append(opts, WithInsecure()) + } + opts = append(opts, o...) + + ctx := context.Background() + exp, err := New(ctx, opts...) + require.NoError(t, err) + return exp, coll + } + exporterErr := errors.New("missing required attribute aaa") + rCh := make(chan otest.ExportResult, 1) + rCh <- otest.ExportResult{Err: &otest.HTTPResponseError{ + Status: http.StatusBadRequest, + Err: exporterErr, + }} + exp, coll := factoryFunc("", rCh) + ctx := context.Background() + t.Cleanup(func() { require.NoError(t, coll.Shutdown(ctx)) }) + // Push this after Shutdown so the HTTP server doesn't hang. + t.Cleanup(func() { close(rCh) }) + t.Cleanup(func() { require.NoError(t, exp.Shutdown(ctx)) }) + exCtx, cancel := context.WithTimeout(ctx, time.Second) + defer cancel() + err := exp.Export(exCtx, &metricdata.ResourceMetrics{}) + assert.ErrorContains(t, err, exporterErr.Error()) + + assert.NoError(t, exCtx.Err()) +} diff --git a/exporters/otlp/otlptrace/otlptracehttp/client.go b/exporters/otlp/otlptrace/otlptracehttp/client.go index bb2f3ffd1d8a..2b7002a80e6d 100644 --- a/exporters/otlp/otlptrace/otlptracehttp/client.go +++ b/exporters/otlp/otlptrace/otlptracehttp/client.go @@ -166,8 +166,7 @@ func (d *client) UploadTraces(ctx context.Context, protoSpans []*tracepb.Resourc }() } - switch sc := resp.StatusCode; { - case sc >= 200 && sc <= 299: + if sc := resp.StatusCode; sc >= 200 && sc <= 299 { // Success, do not retry. // Read the partial success message, if any. var respData bytes.Buffer @@ -194,33 +193,40 @@ func (d *client) UploadTraces(ctx context.Context, protoSpans []*tracepb.Resourc } } return nil - - case sc == http.StatusTooManyRequests, - sc == http.StatusBadGateway, - sc == http.StatusServiceUnavailable, - sc == http.StatusGatewayTimeout: - // Retry-able failures. - rErr := newResponseError(resp.Header, nil) - - // server may return a message with the response - // body, so we read it to include in the error - // message to be returned. It will help in - // debugging the actual issue. - var respData bytes.Buffer - if _, err := io.Copy(&respData, resp.Body); err != nil { - _ = resp.Body.Close() - return err + } + // Error cases. + + // server may return a message with the response + // body, so we read it to include in the error + // message to be returned. It will help in + // debugging the actual issue. + var respData bytes.Buffer + if _, err := io.Copy(&respData, resp.Body); err != nil { + return err + } + respStr := strings.TrimSpace(respData.String()) + + switch resp.StatusCode { + case http.StatusTooManyRequests, + http.StatusBadGateway, + http.StatusServiceUnavailable, + http.StatusGatewayTimeout: + // Retryable failure. + + var err error + if len(respStr) > 0 { + // include response body for context + err = errors.New(respStr) } + return newResponseError(resp.Header, err) + default: + // Non-retryable failure. - // overwrite the error message with the response body - // if it is not empty - if respStr := strings.TrimSpace(respData.String()); respStr != "" { - // Include response for context. + if len(respStr) > 0 { + // include response body for context e := errors.New(respStr) - rErr = newResponseError(resp.Header, e) + return fmt.Errorf("failed to send to %s: %s (%w)", request.URL, resp.Status, e) } - return rErr - default: return fmt.Errorf("failed to send to %s: %s", request.URL, resp.Status) } }) diff --git a/exporters/otlp/otlptrace/otlptracehttp/client_test.go b/exporters/otlp/otlptrace/otlptracehttp/client_test.go index f1df45672bb0..84e9ab7e6553 100644 --- a/exporters/otlp/otlptrace/otlptracehttp/client_test.go +++ b/exporters/otlp/otlptrace/otlptracehttp/client_test.go @@ -244,6 +244,9 @@ func TestTimeout(t *testing.T) { func TestNoRetry(t *testing.T) { mc := runMockCollector(t, mockCollectorConfig{ InjectHTTPStatus: []int{http.StatusBadRequest}, + Partial: &coltracepb.ExportTracePartialSuccess{ + ErrorMessage: "missing required attribute aaa", + }, }) defer mc.MustStop(t) driver := otlptracehttp.NewClient( @@ -265,9 +268,14 @@ func TestNoRetry(t *testing.T) { }() err = exporter.ExportSpans(ctx, otlptracetest.SingleReadOnlySpan()) assert.Error(t, err) - unwrapped := errors.Unwrap(err) - assert.Equal(t, fmt.Sprintf("failed to send to http://%s/v1/traces: 400 Bad Request", mc.endpoint), unwrapped.Error()) assert.True(t, strings.HasPrefix(err.Error(), "traces export: ")) + + unwrapped := errors.Unwrap(err) + assert.Contains(t, unwrapped.Error(), fmt.Sprintf("failed to send to http://%s/v1/traces: 400 Bad Request", mc.endpoint)) + + unwrapped2 := errors.Unwrap(unwrapped) + assert.Contains(t, unwrapped2.Error(), "missing required attribute aaa") + assert.Empty(t, mc.GetSpans()) }