From 5994b767e09c039e129dc16e943789d92428054c Mon Sep 17 00:00:00 2001 From: Antoine Toulme Date: Tue, 20 Dec 2022 10:54:50 -0800 Subject: [PATCH 1/6] do not close the body of the request explicitly as net/http does that for us --- .chloggen/fix_close_zip_twice.yaml | 16 ++++++++++++++++ exporter/splunkhecexporter/client.go | 9 --------- exporter/splunkhecexporter/exporter_test.go | 2 +- 3 files changed, 17 insertions(+), 10 deletions(-) create mode 100644 .chloggen/fix_close_zip_twice.yaml diff --git a/.chloggen/fix_close_zip_twice.yaml b/.chloggen/fix_close_zip_twice.yaml new file mode 100644 index 000000000000..c40d8cc16352 --- /dev/null +++ b/.chloggen/fix_close_zip_twice.yaml @@ -0,0 +1,16 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: splunkhecexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Don't attempt to close the request body explicitly as go does this when sending data. + +# One or more tracking issues related to the change +issues: [17177] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: diff --git a/exporter/splunkhecexporter/client.go b/exporter/splunkhecexporter/client.go index 333c85b3ec97..ca85337da9ca 100644 --- a/exporter/splunkhecexporter/client.go +++ b/exporter/splunkhecexporter/client.go @@ -242,9 +242,6 @@ func (c *client) pushMetricsData( localHeaders["Authorization"] = splunk.HECTokenHeader + " " + accessToken.Str() } } - if err := bufState.Close(); err != nil { - return err - } return c.postEvents(ctx, bufState, localHeaders, bufState.compressionEnabled, bufState.buf.Len()) } @@ -267,9 +264,6 @@ func (c *client) pushTraceData( localHeaders["Authorization"] = splunk.HECTokenHeader + " " + accessToken.Str() } } - if err := bufState.Close(); err != nil { - return err - } return c.postEvents(ctx, bufState, localHeaders, bufState.compressionEnabled, bufState.buf.Len()) } @@ -293,9 +287,6 @@ func (c *client) pushLogData(ctx context.Context, ld plog.Logs) error { localHeaders["Authorization"] = splunk.HECTokenHeader + " " + accessToken.Str() } } - if err := bufState.Close(); err != nil { - return err - } return c.postEvents(ctx, bufState, localHeaders, bufState.compressionEnabled, bufState.buf.Len()) } diff --git a/exporter/splunkhecexporter/exporter_test.go b/exporter/splunkhecexporter/exporter_test.go index 461c35279476..ce059816778e 100644 --- a/exporter/splunkhecexporter/exporter_test.go +++ b/exporter/splunkhecexporter/exporter_test.go @@ -210,7 +210,7 @@ func TestConsumeMetricsData(t *testing.T) { assert.Equal(t, "Splunk 1234", r.Header.Get("Authorization")) assert.Equal(t, "gzip", r.Header.Get("Content-Encoding")) zipReader, err := gzip.NewReader(bytes.NewReader(body)) - assert.NoError(t, err) + require.NoError(t, err) bodyBytes, _ := io.ReadAll(zipReader) firstPayload := strings.Split(string(bodyBytes), "}{")[0] var metric splunk.Event From a18a1e3d7fd1fe79ef7026c4170405c5c0bf4c27 Mon Sep 17 00:00:00 2001 From: Antoine Toulme Date: Tue, 20 Dec 2022 16:21:08 -0800 Subject: [PATCH 2/6] fix flaky test, make sure we catch that we have a dirty buffer --- exporter/splunkhecexporter/client.go | 77 ++++++++++++----- exporter/splunkhecexporter/exporter_test.go | 96 +++++++++++---------- 2 files changed, 109 insertions(+), 64 deletions(-) diff --git a/exporter/splunkhecexporter/client.go b/exporter/splunkhecexporter/client.go index ca85337da9ca..4afc6e9324e9 100644 --- a/exporter/splunkhecexporter/client.go +++ b/exporter/splunkhecexporter/client.go @@ -60,14 +60,21 @@ type bufferState struct { compressionAvailable bool compressionEnabled bool bufferMaxLen uint - writer io.Writer + empty bool + writer acceptingWriter buf *bytes.Buffer bufFront *index resource int library int } +type acceptingWriter interface { + io.Writer + len() (int, error) +} + func (b *bufferState) reset() { + b.empty = true b.buf.Reset() b.compressionEnabled = false b.writer = &cancellableBytesWriter{innerWriter: b.buf, maxCapacity: b.bufferMaxLen} @@ -123,19 +130,29 @@ func (b *bufferState) accept(data []byte) (bool, error) { } } + b.empty = false return true, nil } if overCapacity { return false, nil } + b.empty = false return true, err } +func (b *bufferState) Len() (int, error) { + return b.writer.len() +} + type cancellableBytesWriter struct { innerWriter *bytes.Buffer maxCapacity uint } +func (c *cancellableBytesWriter) len() (int, error) { + return c.innerWriter.Len(), nil +} + func (c *cancellableBytesWriter) Write(b []byte) (int, error) { if c.maxCapacity == 0 { return c.innerWriter.Write(b) @@ -147,19 +164,26 @@ func (c *cancellableBytesWriter) Write(b []byte) (int, error) { } type cancellableGzipWriter struct { - innerBuffer *bytes.Buffer - innerWriter *gzip.Writer - maxCapacity uint - len int + innerBuffer *bytes.Buffer + innerWriter *gzip.Writer + maxCapacity uint + bytesWritten int +} + +func (c *cancellableGzipWriter) len() (int, error) { + if err := c.innerWriter.Flush(); err != nil { + return 0, err + } + return c.innerBuffer.Len(), nil } func (c *cancellableGzipWriter) Write(b []byte) (int, error) { if c.maxCapacity == 0 { return c.innerWriter.Write(b) } - c.len += len(b) + c.bytesWritten += len(b) // if we see that at a 50% compression rate, we'd be over max capacity, start flushing. - if (c.len / 2) > int(c.maxCapacity) { + if (c.bytesWritten / 2) > int(c.maxCapacity) { // we flush so the length of the underlying buffer is accurate. if err := c.innerWriter.Flush(); err != nil { return 0, err @@ -242,7 +266,11 @@ func (c *client) pushMetricsData( localHeaders["Authorization"] = splunk.HECTokenHeader + " " + accessToken.Str() } } - return c.postEvents(ctx, bufState, localHeaders, bufState.compressionEnabled, bufState.buf.Len()) + bufferLength, err := bufState.Len() + if err != nil { + return err + } + return c.postEvents(ctx, bufState, localHeaders, bufState.compressionEnabled, bufferLength) } return c.pushMetricsDataInBatches(ctx, md, send) @@ -264,7 +292,11 @@ func (c *client) pushTraceData( localHeaders["Authorization"] = splunk.HECTokenHeader + " " + accessToken.Str() } } - return c.postEvents(ctx, bufState, localHeaders, bufState.compressionEnabled, bufState.buf.Len()) + bufferLength, err := bufState.Len() + if err != nil { + return err + } + return c.postEvents(ctx, bufState, localHeaders, bufState.compressionEnabled, bufferLength) } return c.pushTracesDataInBatches(ctx, td, send) @@ -287,7 +319,11 @@ func (c *client) pushLogData(ctx context.Context, ld plog.Logs) error { localHeaders["Authorization"] = splunk.HECTokenHeader + " " + accessToken.Str() } } - return c.postEvents(ctx, bufState, localHeaders, bufState.compressionEnabled, bufState.buf.Len()) + bufferLength, err := bufState.Len() + if err != nil { + return err + } + return c.postEvents(ctx, bufState, localHeaders, bufState.compressionEnabled, bufferLength) } return c.pushLogDataInBatches(ctx, ld, send) @@ -314,6 +350,7 @@ func makeBlankBufferState(bufCap uint, compressionAvailable bool) *bufferState { return &bufferState{ compressionAvailable: compressionAvailable, compressionEnabled: false, + empty: true, writer: &cancellableBytesWriter{innerWriter: buf, maxCapacity: bufCap}, buf: buf, bufferMaxLen: bufCap, @@ -373,7 +410,7 @@ func (c *client) pushLogDataInBatches(ctx context.Context, ld plog.Logs, send fu } // There's some leftover unsent non-profiling data - if bufState.buf.Len() > 0 { + if !bufState.empty { if err := send(ctx, bufState, nil); err != nil { return consumererror.NewLogs(err, c.subLogs(ld, bufState.bufFront, profilingBufState.bufFront)) @@ -381,7 +418,7 @@ func (c *client) pushLogDataInBatches(ctx context.Context, ld plog.Logs, send fu } // There's some leftover unsent profiling data - if profilingBufState.buf.Len() > 0 { + if !profilingBufState.empty { if err := send(ctx, profilingBufState, profilingHeaders); err != nil { // Non-profiling bufFront is set to nil because all non-profiling data was flushed successfully above. return consumererror.NewLogs(err, c.subLogs(ld, nil, profilingBufState.bufFront)) @@ -420,7 +457,7 @@ func (c *client) pushLogRecords(ctx context.Context, lds plog.ResourceLogsSlice, continue } - if state.buf.Len() > 0 { + if !state.empty { if err = send(ctx, state, headers); err != nil { return permanentErrors, err } @@ -439,7 +476,7 @@ func (c *client) pushLogRecords(ctx context.Context, lds plog.ResourceLogsSlice, fmt.Errorf("dropped log event error: event size %d bytes larger than configured max content length %d bytes", len(b), state.bufferMaxLen))) continue } - if state.buf.Len() > 0 { + if !state.empty { // This means that the current record had overflown the buffer and was not sent state.bufFront = &index{resource: state.resource, library: state.library, record: k} } else { @@ -486,7 +523,7 @@ func (c *client) pushMetricsRecords(ctx context.Context, mds pmetric.ResourceMet continue } - if state.buf.Len() > 0 { + if !state.empty { if err := send(ctx, state); err != nil { return permanentErrors, err } @@ -506,7 +543,7 @@ func (c *client) pushMetricsRecords(ctx context.Context, mds pmetric.ResourceMet continue } - if state.buf.Len() > 0 { + if !state.empty { // This means that the current record had overflown the buffer and was not sent state.bufFront = &index{resource: state.resource, library: state.library, record: k} } else { @@ -548,7 +585,7 @@ func (c *client) pushTracesData(ctx context.Context, tds ptrace.ResourceSpansSli continue } - if state.buf.Len() > 0 { + if !state.empty { if err = send(ctx, state); err != nil { return permanentErrors, err } @@ -568,7 +605,7 @@ func (c *client) pushTracesData(ctx context.Context, tds ptrace.ResourceSpansSli continue } - if state.buf.Len() > 0 { + if !state.empty { // This means that the current record had overflown the buffer and was not sent state.bufFront = &index{resource: state.resource, library: state.library, record: k} } else { @@ -607,7 +644,7 @@ func (c *client) pushMetricsDataInBatches(ctx context.Context, md pmetric.Metric } // There's some leftover unsent metrics - if bufState.buf.Len() > 0 { + if !bufState.empty { if err := send(ctx, bufState); err != nil { return consumererror.NewMetrics(err, subMetrics(md, bufState.bufFront)) } @@ -642,7 +679,7 @@ func (c *client) pushTracesDataInBatches(ctx context.Context, td ptrace.Traces, } // There's some leftover unsent traces - if bufState.buf.Len() > 0 { + if !bufState.empty { if err := send(ctx, bufState); err != nil { return consumererror.NewTraces(err, subTraces(td, bufState.bufFront)) } diff --git a/exporter/splunkhecexporter/exporter_test.go b/exporter/splunkhecexporter/exporter_test.go index ce059816778e..152bacb62e36 100644 --- a/exporter/splunkhecexporter/exporter_test.go +++ b/exporter/splunkhecexporter/exporter_test.go @@ -19,6 +19,7 @@ import ( "compress/gzip" "context" "encoding/json" + "fmt" "io" "net" "net/http" @@ -158,7 +159,7 @@ func TestConsumeMetricsData(t *testing.T) { md pmetric.Metrics reqTestFunc func(t *testing.T, r *http.Request) httpResponseCode int - maxContentLength int + maxContentLength uint wantErr bool }{ { @@ -201,23 +202,28 @@ func TestConsumeMetricsData(t *testing.T) { md: generateLargeBatch(), reqTestFunc: func(t *testing.T, r *http.Request) { body, err := io.ReadAll(r.Body) - if err != nil { - t.Fatal(err) - } + assert.NoError(t, err) assert.Equal(t, "keep-alive", r.Header.Get("Connection")) assert.Equal(t, "application/json", r.Header.Get("Content-Type")) assert.Equal(t, "OpenTelemetry-Collector Splunk Exporter/v0.0.1", r.Header.Get("User-Agent")) assert.Equal(t, "Splunk 1234", r.Header.Get("Authorization")) - assert.Equal(t, "gzip", r.Header.Get("Content-Encoding")) - zipReader, err := gzip.NewReader(bytes.NewReader(body)) - require.NoError(t, err) - bodyBytes, _ := io.ReadAll(zipReader) - firstPayload := strings.Split(string(bodyBytes), "}{")[0] - var metric splunk.Event - err = json.Unmarshal([]byte(firstPayload+"}"), &metric) - if err != nil { - t.Fatal(err) + bodyBytes := body + // the last batch might not be zipped. + if "gzip" == r.Header.Get("Content-Encoding") { + zipReader, err := gzip.NewReader(bytes.NewReader(body)) + require.NoError(t, err) + bodyBytes, _ = io.ReadAll(zipReader) } + + events := strings.Split(string(bodyBytes), "}{") + firstPayload := events[0] + if len(events) > 1 { + firstPayload += "}" + } + + var metric splunk.Event + err = json.Unmarshal([]byte(firstPayload), &metric) + assert.NoError(t, err, fmt.Sprintf("could not read: %s", firstPayload)) assert.Equal(t, "test_splunk", metric.Source) assert.Equal(t, "test_type", metric.SourceType) assert.Equal(t, "test_index", metric.Index) @@ -229,41 +235,43 @@ func TestConsumeMetricsData(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - if tt.reqTestFunc != nil { - tt.reqTestFunc(t, r) + for i := 0; i < 100; i++ { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if tt.reqTestFunc != nil { + tt.reqTestFunc(t, r) + } + w.WriteHeader(tt.httpResponseCode) + })) + defer server.Close() + + serverURL, err := url.Parse(server.URL) + assert.NoError(t, err) + + options := &exporterOptions{ + url: serverURL, + token: "1234", } - w.WriteHeader(tt.httpResponseCode) - })) - defer server.Close() - serverURL, err := url.Parse(server.URL) - assert.NoError(t, err) - - options := &exporterOptions{ - url: serverURL, - token: "1234", - } - - config := NewFactory().CreateDefaultConfig().(*Config) - config.Source = "test" - config.SourceType = "test_type" - config.Token = "1234" - config.Index = "test_index" - config.SplunkAppName = "OpenTelemetry-Collector Splunk Exporter" - config.SplunkAppVersion = "v0.0.1" - config.MaxContentLengthMetrics = 1800 - - sender, err := buildClient(options, config, zap.NewNop()) - assert.NoError(t, err) + config := NewFactory().CreateDefaultConfig().(*Config) + config.Source = "test" + config.SourceType = "test_type" + config.Token = "1234" + config.Index = "test_index" + config.SplunkAppName = "OpenTelemetry-Collector Splunk Exporter" + config.SplunkAppVersion = "v0.0.1" + config.MaxContentLengthMetrics = tt.maxContentLength + + sender, err := buildClient(options, config, zap.NewNop()) + assert.NoError(t, err) + + err = sender.pushMetricsData(context.Background(), tt.md) + if tt.wantErr { + assert.Error(t, err) + return + } - err = sender.pushMetricsData(context.Background(), tt.md) - if tt.wantErr { - assert.Error(t, err) - return + assert.NoError(t, err) } - - assert.NoError(t, err) }) } } From 5570c7dbc44fd3d07bb16795bdf62395cf95ef64 Mon Sep 17 00:00:00 2001 From: Antoine Toulme Date: Tue, 20 Dec 2022 16:23:55 -0800 Subject: [PATCH 3/6] remove the loop to run the export 100 times --- exporter/splunkhecexporter/exporter_test.go | 60 ++++++++++----------- 1 file changed, 29 insertions(+), 31 deletions(-) diff --git a/exporter/splunkhecexporter/exporter_test.go b/exporter/splunkhecexporter/exporter_test.go index 152bacb62e36..0b42183a111d 100644 --- a/exporter/splunkhecexporter/exporter_test.go +++ b/exporter/splunkhecexporter/exporter_test.go @@ -235,43 +235,41 @@ func TestConsumeMetricsData(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - for i := 0; i < 100; i++ { - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - if tt.reqTestFunc != nil { - tt.reqTestFunc(t, r) - } - w.WriteHeader(tt.httpResponseCode) - })) - defer server.Close() - - serverURL, err := url.Parse(server.URL) - assert.NoError(t, err) - - options := &exporterOptions{ - url: serverURL, - token: "1234", + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if tt.reqTestFunc != nil { + tt.reqTestFunc(t, r) } + w.WriteHeader(tt.httpResponseCode) + })) + defer server.Close() - config := NewFactory().CreateDefaultConfig().(*Config) - config.Source = "test" - config.SourceType = "test_type" - config.Token = "1234" - config.Index = "test_index" - config.SplunkAppName = "OpenTelemetry-Collector Splunk Exporter" - config.SplunkAppVersion = "v0.0.1" - config.MaxContentLengthMetrics = tt.maxContentLength + serverURL, err := url.Parse(server.URL) + assert.NoError(t, err) - sender, err := buildClient(options, config, zap.NewNop()) - assert.NoError(t, err) + options := &exporterOptions{ + url: serverURL, + token: "1234", + } - err = sender.pushMetricsData(context.Background(), tt.md) - if tt.wantErr { - assert.Error(t, err) - return - } + config := NewFactory().CreateDefaultConfig().(*Config) + config.Source = "test" + config.SourceType = "test_type" + config.Token = "1234" + config.Index = "test_index" + config.SplunkAppName = "OpenTelemetry-Collector Splunk Exporter" + config.SplunkAppVersion = "v0.0.1" + config.MaxContentLengthMetrics = tt.maxContentLength - assert.NoError(t, err) + sender, err := buildClient(options, config, zap.NewNop()) + assert.NoError(t, err) + + err = sender.pushMetricsData(context.Background(), tt.md) + if tt.wantErr { + assert.Error(t, err) + return } + + assert.NoError(t, err) }) } } From a5ee6b771cfcd09f857b24d39d8e38b14644eb6f Mon Sep 17 00:00:00 2001 From: Antoine Toulme Date: Tue, 20 Dec 2022 16:32:13 -0800 Subject: [PATCH 4/6] simplify close logic --- exporter/splunkhecexporter/client.go | 80 ++++++++++------------------ 1 file changed, 29 insertions(+), 51 deletions(-) diff --git a/exporter/splunkhecexporter/client.go b/exporter/splunkhecexporter/client.go index 4afc6e9324e9..610360cf10fd 100644 --- a/exporter/splunkhecexporter/client.go +++ b/exporter/splunkhecexporter/client.go @@ -60,21 +60,16 @@ type bufferState struct { compressionAvailable bool compressionEnabled bool bufferMaxLen uint - empty bool - writer acceptingWriter + writer io.Writer buf *bytes.Buffer bufFront *index resource int library int -} - -type acceptingWriter interface { - io.Writer - len() (int, error) + closed bool } func (b *bufferState) reset() { - b.empty = true + b.closed = false b.buf.Reset() b.compressionEnabled = false b.writer = &cancellableBytesWriter{innerWriter: b.buf, maxCapacity: b.bufferMaxLen} @@ -85,6 +80,10 @@ func (b *bufferState) Read(p []byte) (n int, err error) { } func (b *bufferState) Close() error { + if b.closed { + return nil + } + b.closed = true if _, ok := b.writer.(*cancellableGzipWriter); ok { return b.writer.(*cancellableGzipWriter).close() } @@ -130,29 +129,19 @@ func (b *bufferState) accept(data []byte) (bool, error) { } } - b.empty = false return true, nil } if overCapacity { return false, nil } - b.empty = false return true, err } -func (b *bufferState) Len() (int, error) { - return b.writer.len() -} - type cancellableBytesWriter struct { innerWriter *bytes.Buffer maxCapacity uint } -func (c *cancellableBytesWriter) len() (int, error) { - return c.innerWriter.Len(), nil -} - func (c *cancellableBytesWriter) Write(b []byte) (int, error) { if c.maxCapacity == 0 { return c.innerWriter.Write(b) @@ -164,26 +153,19 @@ func (c *cancellableBytesWriter) Write(b []byte) (int, error) { } type cancellableGzipWriter struct { - innerBuffer *bytes.Buffer - innerWriter *gzip.Writer - maxCapacity uint - bytesWritten int -} - -func (c *cancellableGzipWriter) len() (int, error) { - if err := c.innerWriter.Flush(); err != nil { - return 0, err - } - return c.innerBuffer.Len(), nil + innerBuffer *bytes.Buffer + innerWriter *gzip.Writer + maxCapacity uint + len int } func (c *cancellableGzipWriter) Write(b []byte) (int, error) { if c.maxCapacity == 0 { return c.innerWriter.Write(b) } - c.bytesWritten += len(b) + c.len += len(b) // if we see that at a 50% compression rate, we'd be over max capacity, start flushing. - if (c.bytesWritten / 2) > int(c.maxCapacity) { + if (c.len / 2) > int(c.maxCapacity) { // we flush so the length of the underlying buffer is accurate. if err := c.innerWriter.Flush(); err != nil { return 0, err @@ -266,11 +248,10 @@ func (c *client) pushMetricsData( localHeaders["Authorization"] = splunk.HECTokenHeader + " " + accessToken.Str() } } - bufferLength, err := bufState.Len() - if err != nil { + if err := bufState.Close(); err != nil { return err } - return c.postEvents(ctx, bufState, localHeaders, bufState.compressionEnabled, bufferLength) + return c.postEvents(ctx, bufState, localHeaders, bufState.compressionEnabled, bufState.buf.Len()) } return c.pushMetricsDataInBatches(ctx, md, send) @@ -292,11 +273,10 @@ func (c *client) pushTraceData( localHeaders["Authorization"] = splunk.HECTokenHeader + " " + accessToken.Str() } } - bufferLength, err := bufState.Len() - if err != nil { + if err := bufState.Close(); err != nil { return err } - return c.postEvents(ctx, bufState, localHeaders, bufState.compressionEnabled, bufferLength) + return c.postEvents(ctx, bufState, localHeaders, bufState.compressionEnabled, bufState.buf.Len()) } return c.pushTracesDataInBatches(ctx, td, send) @@ -319,11 +299,10 @@ func (c *client) pushLogData(ctx context.Context, ld plog.Logs) error { localHeaders["Authorization"] = splunk.HECTokenHeader + " " + accessToken.Str() } } - bufferLength, err := bufState.Len() - if err != nil { + if err := bufState.Close(); err != nil { return err } - return c.postEvents(ctx, bufState, localHeaders, bufState.compressionEnabled, bufferLength) + return c.postEvents(ctx, bufState, localHeaders, bufState.compressionEnabled, bufState.buf.Len()) } return c.pushLogDataInBatches(ctx, ld, send) @@ -350,7 +329,6 @@ func makeBlankBufferState(bufCap uint, compressionAvailable bool) *bufferState { return &bufferState{ compressionAvailable: compressionAvailable, compressionEnabled: false, - empty: true, writer: &cancellableBytesWriter{innerWriter: buf, maxCapacity: bufCap}, buf: buf, bufferMaxLen: bufCap, @@ -410,7 +388,7 @@ func (c *client) pushLogDataInBatches(ctx context.Context, ld plog.Logs, send fu } // There's some leftover unsent non-profiling data - if !bufState.empty { + if bufState.buf.Len() > 0 { if err := send(ctx, bufState, nil); err != nil { return consumererror.NewLogs(err, c.subLogs(ld, bufState.bufFront, profilingBufState.bufFront)) @@ -418,7 +396,7 @@ func (c *client) pushLogDataInBatches(ctx context.Context, ld plog.Logs, send fu } // There's some leftover unsent profiling data - if !profilingBufState.empty { + if profilingBufState.buf.Len() > 0 { if err := send(ctx, profilingBufState, profilingHeaders); err != nil { // Non-profiling bufFront is set to nil because all non-profiling data was flushed successfully above. return consumererror.NewLogs(err, c.subLogs(ld, nil, profilingBufState.bufFront)) @@ -457,7 +435,7 @@ func (c *client) pushLogRecords(ctx context.Context, lds plog.ResourceLogsSlice, continue } - if !state.empty { + if state.buf.Len() > 0 { if err = send(ctx, state, headers); err != nil { return permanentErrors, err } @@ -476,7 +454,7 @@ func (c *client) pushLogRecords(ctx context.Context, lds plog.ResourceLogsSlice, fmt.Errorf("dropped log event error: event size %d bytes larger than configured max content length %d bytes", len(b), state.bufferMaxLen))) continue } - if !state.empty { + if state.buf.Len() > 0 { // This means that the current record had overflown the buffer and was not sent state.bufFront = &index{resource: state.resource, library: state.library, record: k} } else { @@ -523,7 +501,7 @@ func (c *client) pushMetricsRecords(ctx context.Context, mds pmetric.ResourceMet continue } - if !state.empty { + if state.buf.Len() > 0 { if err := send(ctx, state); err != nil { return permanentErrors, err } @@ -543,7 +521,7 @@ func (c *client) pushMetricsRecords(ctx context.Context, mds pmetric.ResourceMet continue } - if !state.empty { + if state.buf.Len() > 0 { // This means that the current record had overflown the buffer and was not sent state.bufFront = &index{resource: state.resource, library: state.library, record: k} } else { @@ -585,7 +563,7 @@ func (c *client) pushTracesData(ctx context.Context, tds ptrace.ResourceSpansSli continue } - if !state.empty { + if state.buf.Len() > 0 { if err = send(ctx, state); err != nil { return permanentErrors, err } @@ -605,7 +583,7 @@ func (c *client) pushTracesData(ctx context.Context, tds ptrace.ResourceSpansSli continue } - if !state.empty { + if state.buf.Len() > 0 { // This means that the current record had overflown the buffer and was not sent state.bufFront = &index{resource: state.resource, library: state.library, record: k} } else { @@ -644,7 +622,7 @@ func (c *client) pushMetricsDataInBatches(ctx context.Context, md pmetric.Metric } // There's some leftover unsent metrics - if !bufState.empty { + if bufState.buf.Len() > 0 { if err := send(ctx, bufState); err != nil { return consumererror.NewMetrics(err, subMetrics(md, bufState.bufFront)) } @@ -679,7 +657,7 @@ func (c *client) pushTracesDataInBatches(ctx context.Context, td ptrace.Traces, } // There's some leftover unsent traces - if !bufState.empty { + if bufState.buf.Len() > 0 { if err := send(ctx, bufState); err != nil { return consumererror.NewTraces(err, subTraces(td, bufState.bufFront)) } From 751429cb346a32d219f25ba56505da94d90c7475 Mon Sep 17 00:00:00 2001 From: Antoine Toulme Date: Tue, 20 Dec 2022 17:02:50 -0800 Subject: [PATCH 5/6] remove closed flag and fix govet error --- exporter/splunkhecexporter/client.go | 6 ------ exporter/splunkhecexporter/exporter_test.go | 4 ++-- 2 files changed, 2 insertions(+), 8 deletions(-) diff --git a/exporter/splunkhecexporter/client.go b/exporter/splunkhecexporter/client.go index 610360cf10fd..333c85b3ec97 100644 --- a/exporter/splunkhecexporter/client.go +++ b/exporter/splunkhecexporter/client.go @@ -65,11 +65,9 @@ type bufferState struct { bufFront *index resource int library int - closed bool } func (b *bufferState) reset() { - b.closed = false b.buf.Reset() b.compressionEnabled = false b.writer = &cancellableBytesWriter{innerWriter: b.buf, maxCapacity: b.bufferMaxLen} @@ -80,10 +78,6 @@ func (b *bufferState) Read(p []byte) (n int, err error) { } func (b *bufferState) Close() error { - if b.closed { - return nil - } - b.closed = true if _, ok := b.writer.(*cancellableGzipWriter); ok { return b.writer.(*cancellableGzipWriter).close() } diff --git a/exporter/splunkhecexporter/exporter_test.go b/exporter/splunkhecexporter/exporter_test.go index 0b42183a111d..a274039617b5 100644 --- a/exporter/splunkhecexporter/exporter_test.go +++ b/exporter/splunkhecexporter/exporter_test.go @@ -210,8 +210,8 @@ func TestConsumeMetricsData(t *testing.T) { bodyBytes := body // the last batch might not be zipped. if "gzip" == r.Header.Get("Content-Encoding") { - zipReader, err := gzip.NewReader(bytes.NewReader(body)) - require.NoError(t, err) + zipReader, err2 := gzip.NewReader(bytes.NewReader(body)) + require.NoError(t, err2) bodyBytes, _ = io.ReadAll(zipReader) } From b841b199d48c706e2bb490faba3871d278b2dd32 Mon Sep 17 00:00:00 2001 From: Antoine Toulme Date: Tue, 20 Dec 2022 17:04:25 -0800 Subject: [PATCH 6/6] remove changelog entry --- .chloggen/fix_close_zip_twice.yaml | 16 ---------------- 1 file changed, 16 deletions(-) delete mode 100644 .chloggen/fix_close_zip_twice.yaml diff --git a/.chloggen/fix_close_zip_twice.yaml b/.chloggen/fix_close_zip_twice.yaml deleted file mode 100644 index c40d8cc16352..000000000000 --- a/.chloggen/fix_close_zip_twice.yaml +++ /dev/null @@ -1,16 +0,0 @@ -# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' -change_type: bug_fix - -# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) -component: splunkhecexporter - -# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). -note: Don't attempt to close the request body explicitly as go does this when sending data. - -# One or more tracking issues related to the change -issues: [17177] - -# (Optional) One or more lines of additional information to render under the primary note. -# These lines will be padded with 2 spaces and then inserted directly into the document. -# Use pipe (|) for multiline entries. -subtext: