From cdbe8e7b8460c6149deab60e1eacec0ba7cd5a13 Mon Sep 17 00:00:00 2001 From: Antoine Toulme Date: Wed, 23 Aug 2023 21:15:26 -0700 Subject: [PATCH] [exporter/otlphttpexporter] Partial success HTTP response handling (#8270) **Description:** Fix the handling of the HTTP response to ignore responses not encoded as protobuf **Link to tracking Issue:** Fixes #8263 --- .../handle-http-response-partial-success.yaml | 25 ++++++++ exporter/otlphttpexporter/otlp.go | 23 +++++-- exporter/otlphttpexporter/otlp_test.go | 64 ++++++++++++++++++- 3 files changed, 103 insertions(+), 9 deletions(-) create mode 100644 .chloggen/handle-http-response-partial-success.yaml diff --git a/.chloggen/handle-http-response-partial-success.yaml b/.chloggen/handle-http-response-partial-success.yaml new file mode 100644 index 00000000000..b3b42c79eaf --- /dev/null +++ b/.chloggen/handle-http-response-partial-success.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: otlphttpexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Fix the handling of the HTTP response to ignore responses not encoded as protobuf + +# One or more tracking issues or pull requests related to the change +issues: [8263] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] \ No newline at end of file diff --git a/exporter/otlphttpexporter/otlp.go b/exporter/otlphttpexporter/otlp.go index 9e2e92a34d5..5d196372b37 100644 --- a/exporter/otlphttpexporter/otlp.go +++ b/exporter/otlphttpexporter/otlp.go @@ -47,6 +47,8 @@ type baseExporter struct { const ( headerRetryAfter = "Retry-After" maxHTTPResponseReadBytes = 64 * 1024 + + protobufContentType = "application/x-protobuf" ) // Create new exporter. @@ -118,7 +120,7 @@ func (e *baseExporter) export(ctx context.Context, url string, request []byte, p if err != nil { return consumererror.NewPermanent(err) } - req.Header.Set("Content-Type", "application/x-protobuf") + req.Header.Set("Content-Type", protobufContentType) req.Header.Set("User-Agent", e.userAgent) resp, err := e.client.Do(req) @@ -252,12 +254,15 @@ func handlePartialSuccessResponse(resp *http.Response, partialSuccessHandler par return err } - return partialSuccessHandler(bodyBytes) + return partialSuccessHandler(bodyBytes, resp.Header.Get("Content-Type")) } -type partialSuccessHandler func(protoBytes []byte) error +type partialSuccessHandler func(bytes []byte, contentType string) error -func tracesPartialSuccessHandler(protoBytes []byte) error { +func tracesPartialSuccessHandler(protoBytes []byte, contentType string) error { + if contentType != protobufContentType { + return nil + } exportResponse := ptraceotlp.NewExportResponse() err := exportResponse.UnmarshalProto(protoBytes) if err != nil { @@ -270,7 +275,10 @@ func tracesPartialSuccessHandler(protoBytes []byte) error { return nil } -func metricsPartialSuccessHandler(protoBytes []byte) error { +func metricsPartialSuccessHandler(protoBytes []byte, contentType string) error { + if contentType != protobufContentType { + return nil + } exportResponse := pmetricotlp.NewExportResponse() err := exportResponse.UnmarshalProto(protoBytes) if err != nil { @@ -283,7 +291,10 @@ func metricsPartialSuccessHandler(protoBytes []byte) error { return nil } -func logsPartialSuccessHandler(protoBytes []byte) error { +func logsPartialSuccessHandler(protoBytes []byte, contentType string) error { + if contentType != protobufContentType { + return nil + } exportResponse := plogotlp.NewExportResponse() err := exportResponse.UnmarshalProto(protoBytes) if err != nil { diff --git a/exporter/otlphttpexporter/otlp_test.go b/exporter/otlphttpexporter/otlp_test.go index 64d9223e3f5..f04ec8a6190 100644 --- a/exporter/otlphttpexporter/otlp_test.go +++ b/exporter/otlphttpexporter/otlp_test.go @@ -687,6 +687,7 @@ func TestPartialSuccess_traces(t *testing.T) { partial.SetRejectedSpans(1) bytes, err := response.MarshalProto() require.NoError(t, err) + writer.Header().Set("Content-Type", "application/x-protobuf") _, err = writer.Write(bytes) require.NoError(t, err) }) @@ -720,6 +721,7 @@ func TestPartialSuccess_metrics(t *testing.T) { partial.SetRejectedDataPoints(1) bytes, err := response.MarshalProto() require.NoError(t, err) + writer.Header().Set("Content-Type", "application/x-protobuf") _, err = writer.Write(bytes) require.NoError(t, err) }) @@ -751,9 +753,10 @@ func TestPartialSuccess_logs(t *testing.T) { partial := response.PartialSuccess() partial.SetErrorMessage("hello") partial.SetRejectedLogRecords(1) - bytes, err := response.MarshalProto() + b, err := response.MarshalProto() require.NoError(t, err) - _, err = writer.Write(bytes) + writer.Header().Set("Content-Type", "application/x-protobuf") + _, err = writer.Write(b) require.NoError(t, err) }) defer srv.Close() @@ -789,6 +792,9 @@ func TestPartialResponse_missingHeaderButHasBody(t *testing.T) { // `-1` indicates a missing Content-Length header in the Go http standard library ContentLength: -1, Body: io.NopCloser(bytes.NewReader(data)), + Header: map[string][]string{ + "Content-Type": {"application/x-protobuf"}, + }, } err = handlePartialSuccessResponse(resp, tracesPartialSuccessHandler) assert.True(t, consumererror.IsPermanent(err)) @@ -799,6 +805,9 @@ func TestPartialResponse_missingHeaderAndBody(t *testing.T) { // `-1` indicates a missing Content-Length header in the Go http standard library ContentLength: -1, Body: io.NopCloser(bytes.NewReader([]byte{})), + Header: map[string][]string{ + "Content-Type": {"application/x-protobuf"}, + }, } err := handlePartialSuccessResponse(resp, tracesPartialSuccessHandler) assert.Nil(t, err) @@ -824,6 +833,9 @@ func TestPartialSuccess_shortContentLengthHeader(t *testing.T) { resp := &http.Response{ ContentLength: 3, Body: io.NopCloser(bytes.NewReader(data)), + Header: map[string][]string{ + "Content-Type": {"application/x-protobuf"}, + }, } err = handlePartialSuccessResponse(resp, tracesPartialSuccessHandler) assert.Error(t, err) @@ -839,6 +851,9 @@ func TestPartialSuccess_longContentLengthHeader(t *testing.T) { resp := &http.Response{ ContentLength: 4096, Body: io.NopCloser(bytes.NewReader(data)), + Header: map[string][]string{ + "Content-Type": {"application/x-protobuf"}, + }, } err = handlePartialSuccessResponse(resp, tracesPartialSuccessHandler) assert.Error(t, err) @@ -848,6 +863,9 @@ func TestPartialSuccessInvalidResponseBody(t *testing.T) { resp := &http.Response{ Body: io.NopCloser(badReader{}), ContentLength: 100, + Header: map[string][]string{ + "Content-Type": {protobufContentType}, + }, } err := handlePartialSuccessResponse(resp, tracesPartialSuccessHandler) assert.Error(t, err) @@ -873,12 +891,52 @@ func TestPartialSuccessInvalidBody(t *testing.T) { } for _, tt := range invalidBodyCases { t.Run("Invalid response body_"+tt.telemetryType, func(t *testing.T) { - err := tt.handler([]byte{1}) + err := tt.handler([]byte{1}, "application/x-protobuf") assert.Error(t, err) }) } } +func TestPartialSuccessUnsupportedContentType(t *testing.T) { + unsupportedContentTypeCases := []struct { + contentType string + }{ + { + contentType: "application/json", + }, + { + contentType: "text/plain", + }, + { + contentType: "application/octet-stream", + }, + } + for _, telemetryType := range []string{"logs", "metrics", "traces"} { + for _, tt := range unsupportedContentTypeCases { + t.Run("Unsupported content type "+tt.contentType+" "+telemetryType, func(t *testing.T) { + var handler func(b []byte, contentType string) error + switch telemetryType { + case "logs": + handler = logsPartialSuccessHandler + case "metrics": + handler = metricsPartialSuccessHandler + case "traces": + handler = tracesPartialSuccessHandler + default: + panic(telemetryType) + } + exportResponse := ptraceotlp.NewExportResponse() + exportResponse.PartialSuccess().SetErrorMessage("foo") + exportResponse.PartialSuccess().SetRejectedSpans(42) + b, err := exportResponse.MarshalProto() + require.NoError(t, err) + err = handler(b, tt.contentType) + assert.NoError(t, err) + }) + } + } +} + func createBackend(endpoint string, handler func(writer http.ResponseWriter, request *http.Request)) *httptest.Server { mux := http.NewServeMux() mux.HandleFunc(endpoint, handler)