diff --git a/.chloggen/splunkhec-gzip-pool-data-race-fix.yaml b/.chloggen/splunkhec-gzip-pool-data-race-fix.yaml new file mode 100644 index 000000000000..e6869bcb40fc --- /dev/null +++ b/.chloggen/splunkhec-gzip-pool-data-race-fix.yaml @@ -0,0 +1,16 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: splunkhecexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Fix data race when gzip compression is enabled + +# One or more tracking issues related to the change +issues: [17083] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: Removed gzip writer pool and create a new one when needed. diff --git a/exporter/splunkhecexporter/client.go b/exporter/splunkhecexporter/client.go index 181e91f9e77a..333c85b3ec97 100644 --- a/exporter/splunkhecexporter/client.go +++ b/exporter/splunkhecexporter/client.go @@ -53,7 +53,6 @@ type client struct { logger *zap.Logger wg sync.WaitGroup headers map[string]string - gzipWriterPool *sync.Pool } // bufferState encapsulates intermediate buffer state when pushing data @@ -66,7 +65,6 @@ type bufferState struct { bufFront *index resource int library int - gzipWriterPool *sync.Pool } func (b *bufferState) reset() { @@ -97,14 +95,13 @@ func (b *bufferState) accept(data []byte) (bool, error) { if b.compressionAvailable && !b.compressionEnabled && bufLen > minCompressionLen { // switch over to a zip buffer. tmpBuf := bytes.NewBuffer(make([]byte, 0, b.bufferMaxLen+bufCapPadding)) - writer := b.gzipWriterPool.Get().(*gzip.Writer) + writer := gzip.NewWriter(tmpBuf) writer.Reset(tmpBuf) zipWriter := &cancellableGzipWriter{ innerBuffer: tmpBuf, innerWriter: writer, // 8 bytes required for the zip footer. - maxCapacity: b.bufferMaxLen - 8, - gzipWriterPool: b.gzipWriterPool, + maxCapacity: b.bufferMaxLen - 8, } if b.bufferMaxLen == 0 { @@ -150,11 +147,10 @@ func (c *cancellableBytesWriter) Write(b []byte) (int, error) { } type cancellableGzipWriter struct { - innerBuffer *bytes.Buffer - innerWriter *gzip.Writer - maxCapacity uint - gzipWriterPool *sync.Pool - len int + innerBuffer *bytes.Buffer + innerWriter *gzip.Writer + maxCapacity uint + len int } func (c *cancellableGzipWriter) Write(b []byte) (int, error) { @@ -174,8 +170,7 @@ func (c *cancellableGzipWriter) Write(b []byte) (int, error) { // so we create a copy of our content and add this new data, compressed, to check that it fits. copyBuf := bytes.NewBuffer(make([]byte, 0, c.maxCapacity+bufCapPadding)) copyBuf.Write(c.innerBuffer.Bytes()) - writerCopy := c.gzipWriterPool.Get().(*gzip.Writer) - defer c.gzipWriterPool.Put(writerCopy) + writerCopy := gzip.NewWriter(copyBuf) writerCopy.Reset(copyBuf) if _, err := writerCopy.Write(b); err != nil { return 0, err @@ -192,9 +187,7 @@ func (c *cancellableGzipWriter) Write(b []byte) (int, error) { } func (c *cancellableGzipWriter) close() error { - err := c.innerWriter.Close() - c.gzipWriterPool.Put(c.innerWriter) - return err + return c.innerWriter.Close() } // Composite index of a record. @@ -323,7 +316,7 @@ func isProfilingData(sl plog.ScopeLogs) bool { return sl.Scope().Name() == profilingLibraryName } -func makeBlankBufferState(bufCap uint, compressionAvailable bool, pool *sync.Pool) *bufferState { +func makeBlankBufferState(bufCap uint, compressionAvailable bool) *bufferState { // Buffer of JSON encoded Splunk events, last record is expected to overflow bufCap, hence the padding buf := bytes.NewBuffer(make([]byte, 0, bufCap+bufCapPadding)) @@ -336,7 +329,6 @@ func makeBlankBufferState(bufCap uint, compressionAvailable bool, pool *sync.Poo bufFront: nil, // Index of the log record of the first unsent event in buffer. resource: 0, // Index of currently processed Resource library: 0, // Index of currently processed Library - gzipWriterPool: pool, } } @@ -346,8 +338,8 @@ func makeBlankBufferState(bufCap uint, compressionAvailable bool, pool *sync.Poo // The input data may contain both logs and profiling data. // They are batched separately and sent with different HTTP headers func (c *client) pushLogDataInBatches(ctx context.Context, ld plog.Logs, send func(context.Context, *bufferState, map[string]string) error) error { - var bufState = makeBlankBufferState(c.config.MaxContentLengthLogs, !c.config.DisableCompression, c.gzipWriterPool) - var profilingBufState = makeBlankBufferState(c.config.MaxContentLengthLogs, !c.config.DisableCompression, c.gzipWriterPool) + var bufState = makeBlankBufferState(c.config.MaxContentLengthLogs, !c.config.DisableCompression) + var profilingBufState = makeBlankBufferState(c.config.MaxContentLengthLogs, !c.config.DisableCompression) var permanentErrors []error var rls = ld.ResourceLogs() @@ -602,7 +594,7 @@ func (c *client) pushTracesData(ctx context.Context, tds ptrace.ResourceSpansSli // The batch content length is restricted to MaxContentLengthMetrics. // md metrics are parsed to Splunk events. func (c *client) pushMetricsDataInBatches(ctx context.Context, md pmetric.Metrics, send func(context.Context, *bufferState) error) error { - var bufState = makeBlankBufferState(c.config.MaxContentLengthMetrics, !c.config.DisableCompression, c.gzipWriterPool) + var bufState = makeBlankBufferState(c.config.MaxContentLengthMetrics, !c.config.DisableCompression) var permanentErrors []error var rms = md.ResourceMetrics() @@ -637,7 +629,7 @@ func (c *client) pushMetricsDataInBatches(ctx context.Context, md pmetric.Metric // The batch content length is restricted to MaxContentLengthMetrics. // td traces are parsed to Splunk events. func (c *client) pushTracesDataInBatches(ctx context.Context, td ptrace.Traces, send func(context.Context, *bufferState) error) error { - bufState := makeBlankBufferState(c.config.MaxContentLengthTraces, !c.config.DisableCompression, c.gzipWriterPool) + bufState := makeBlankBufferState(c.config.MaxContentLengthTraces, !c.config.DisableCompression) var permanentErrors []error var rts = td.ResourceSpans() diff --git a/exporter/splunkhecexporter/client_test.go b/exporter/splunkhecexporter/client_test.go index 101b03fe9db5..7f0d89bd101f 100644 --- a/exporter/splunkhecexporter/client_test.go +++ b/exporter/splunkhecexporter/client_test.go @@ -28,7 +28,6 @@ import ( "regexp" "sort" "strings" - "sync" "testing" "time" @@ -1023,9 +1022,6 @@ func Test_pushLogData_nil_Logs(t *testing.T) { c := client{ config: NewFactory().CreateDefaultConfig().(*Config), logger: zaptest.NewLogger(t), - gzipWriterPool: &sync.Pool{New: func() interface{} { - return gzip.NewWriter(nil) - }}, } for _, test := range tests { @@ -1061,9 +1057,6 @@ func Test_pushLogData_PostError(t *testing.T) { url: &url.URL{Host: "in va lid"}, config: NewFactory().CreateDefaultConfig().(*Config), logger: zaptest.NewLogger(t), - gzipWriterPool: &sync.Pool{New: func() interface{} { - return gzip.NewWriter(nil) - }}, } // 2000 log records -> ~371888 bytes when JSON encoded. @@ -1104,9 +1097,6 @@ func Test_pushLogData_ShouldAddResponseTo400Error(t *testing.T) { url: &url.URL{Scheme: "http", Host: "splunk"}, config: NewFactory().CreateDefaultConfig().(*Config), logger: zaptest.NewLogger(t), - gzipWriterPool: &sync.Pool{New: func() interface{} { - return gzip.NewWriter(nil) - }}, } logs := createLogData(1, 1, 1) @@ -1139,9 +1129,6 @@ func Test_pushLogData_ShouldReturnUnsentLogsOnly(t *testing.T) { url: &url.URL{Scheme: "http", Host: "splunk"}, config: config, logger: zaptest.NewLogger(t), - gzipWriterPool: &sync.Pool{New: func() interface{} { - return gzip.NewWriter(nil) - }}, } // Just two records @@ -1169,9 +1156,6 @@ func Test_pushLogData_ShouldAddHeadersForProfilingData(t *testing.T) { url: &url.URL{Scheme: "http", Host: "splunk"}, config: NewFactory().CreateDefaultConfig().(*Config), logger: zaptest.NewLogger(t), - gzipWriterPool: &sync.Pool{New: func() interface{} { - return gzip.NewWriter(nil) - }}, } logs := createLogDataWithCustomLibraries(1, []string{"otel.logs", "otel.profiling"}, []int{10, 20}) @@ -1238,9 +1222,6 @@ func benchPushLogData(b *testing.B, numResources int, numProfiling int, numNonPr url: &url.URL{Scheme: "http", Host: "splunk"}, config: NewFactory().CreateDefaultConfig().(*Config), logger: zaptest.NewLogger(b), - gzipWriterPool: &sync.Pool{New: func() interface{} { - return gzip.NewWriter(nil) - }}, } c.client, _ = newTestClient(200, "OK") @@ -1261,9 +1242,6 @@ func Test_pushLogData_Small_MaxContentLength(t *testing.T) { logger: zaptest.NewLogger(t), url: &url.URL{Scheme: "http", Host: "splunk"}, client: http.DefaultClient, - gzipWriterPool: &sync.Pool{New: func() interface{} { - return gzip.NewWriter(nil) - }}, } c.config.MaxContentLengthLogs = 1 @@ -1339,9 +1317,6 @@ func TestSubLogs(t *testing.T) { c := client{ config: NewFactory().CreateDefaultConfig().(*Config), - gzipWriterPool: &sync.Pool{New: func() interface{} { - return gzip.NewWriter(nil) - }}, } // Logs subset from leftmost index (resource 0, library 0, record 0). @@ -1413,9 +1388,7 @@ func TestHecHealthCheckFailed(t *testing.T) { url: &url.URL{Scheme: "http", Host: "splunk"}, config: NewFactory().CreateDefaultConfig().(*Config), logger: zaptest.NewLogger(t), - gzipWriterPool: &sync.Pool{New: func() interface{} { - return gzip.NewWriter(nil) - }}, + healthCheckURL: &url.URL{Scheme: "http", Host: "splunk", Path: "/services/collector/health"}, } c.client, _ = newTestClient(503, "NOK") @@ -1429,9 +1402,7 @@ func TestHecHealthCheckSucceded(t *testing.T) { url: &url.URL{Scheme: "http", Host: "splunk"}, config: NewFactory().CreateDefaultConfig().(*Config), logger: zaptest.NewLogger(t), - gzipWriterPool: &sync.Pool{New: func() interface{} { - return gzip.NewWriter(nil) - }}, + healthCheckURL: &url.URL{Scheme: "http", Host: "splunk", Path: "/services/collector/health"}, } c.client, _ = newTestClient(200, "OK") @@ -1459,16 +1430,11 @@ func BenchmarkPushLogRecords(b *testing.B) { url: &url.URL{Scheme: "http", Host: "splunk"}, config: NewFactory().CreateDefaultConfig().(*Config), logger: zap.NewNop(), - gzipWriterPool: &sync.Pool{New: func() interface{} { - return gzip.NewWriter(nil) - }}, } sender := func(ctx context.Context, state *bufferState, headers map[string]string) error { return nil } - state := makeBlankBufferState(4096, true, &sync.Pool{New: func() interface{} { - return gzip.NewWriter(nil) - }}) + state := makeBlankBufferState(4096, true) for n := 0; n < b.N; n++ { permanentErrs, sendingErr := c.pushLogRecords(context.Background(), logs.ResourceLogs(), state, map[string]string{}, sender) assert.NoError(b, sendingErr) diff --git a/exporter/splunkhecexporter/exporter.go b/exporter/splunkhecexporter/exporter.go index 87a366fd1a0b..32d52278f5b5 100644 --- a/exporter/splunkhecexporter/exporter.go +++ b/exporter/splunkhecexporter/exporter.go @@ -15,14 +15,12 @@ package splunkhecexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/splunkhecexporter" import ( - "compress/gzip" "context" "errors" "fmt" "net" "net/http" "net/url" - "sync" "time" "go.opentelemetry.io/collector/component" @@ -134,8 +132,5 @@ func buildClient(options *exporterOptions, config *Config, logger *zap.Logger) ( "__splunk_app_version": config.SplunkAppVersion, }, config: config, - gzipWriterPool: &sync.Pool{New: func() interface{} { - return gzip.NewWriter(nil) - }}, }, nil }