Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[exporter/splunkhec] Fix data race when gzip compression is enabled #17084

Merged
merged 2 commits into from
Dec 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions .chloggen/splunkhec-gzip-pool-data-race-fix.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: splunkhecexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Fix data race when gzip compression is enabled

# One or more tracking issues related to the change
issues: [17083]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: Removed gzip writer pool and create a new one when needed.
34 changes: 13 additions & 21 deletions exporter/splunkhecexporter/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ type client struct {
logger *zap.Logger
wg sync.WaitGroup
headers map[string]string
gzipWriterPool *sync.Pool
}

// bufferState encapsulates intermediate buffer state when pushing data
Expand All @@ -66,7 +65,6 @@ type bufferState struct {
bufFront *index
resource int
library int
gzipWriterPool *sync.Pool
}

func (b *bufferState) reset() {
Expand Down Expand Up @@ -97,14 +95,13 @@ func (b *bufferState) accept(data []byte) (bool, error) {
if b.compressionAvailable && !b.compressionEnabled && bufLen > minCompressionLen {
// switch over to a zip buffer.
tmpBuf := bytes.NewBuffer(make([]byte, 0, b.bufferMaxLen+bufCapPadding))
writer := b.gzipWriterPool.Get().(*gzip.Writer)
writer := gzip.NewWriter(tmpBuf)
writer.Reset(tmpBuf)
zipWriter := &cancellableGzipWriter{
innerBuffer: tmpBuf,
innerWriter: writer,
// 8 bytes required for the zip footer.
maxCapacity: b.bufferMaxLen - 8,
gzipWriterPool: b.gzipWriterPool,
maxCapacity: b.bufferMaxLen - 8,
}

if b.bufferMaxLen == 0 {
Expand Down Expand Up @@ -150,11 +147,10 @@ func (c *cancellableBytesWriter) Write(b []byte) (int, error) {
}

type cancellableGzipWriter struct {
innerBuffer *bytes.Buffer
innerWriter *gzip.Writer
maxCapacity uint
gzipWriterPool *sync.Pool
len int
innerBuffer *bytes.Buffer
innerWriter *gzip.Writer
maxCapacity uint
len int
}

func (c *cancellableGzipWriter) Write(b []byte) (int, error) {
Expand All @@ -174,8 +170,7 @@ func (c *cancellableGzipWriter) Write(b []byte) (int, error) {
// so we create a copy of our content and add this new data, compressed, to check that it fits.
copyBuf := bytes.NewBuffer(make([]byte, 0, c.maxCapacity+bufCapPadding))
copyBuf.Write(c.innerBuffer.Bytes())
writerCopy := c.gzipWriterPool.Get().(*gzip.Writer)
defer c.gzipWriterPool.Put(writerCopy)
writerCopy := gzip.NewWriter(copyBuf)
writerCopy.Reset(copyBuf)
if _, err := writerCopy.Write(b); err != nil {
return 0, err
Expand All @@ -192,9 +187,7 @@ func (c *cancellableGzipWriter) Write(b []byte) (int, error) {
}

func (c *cancellableGzipWriter) close() error {
err := c.innerWriter.Close()
c.gzipWriterPool.Put(c.innerWriter)
return err
return c.innerWriter.Close()
}

// Composite index of a record.
Expand Down Expand Up @@ -323,7 +316,7 @@ func isProfilingData(sl plog.ScopeLogs) bool {
return sl.Scope().Name() == profilingLibraryName
}

func makeBlankBufferState(bufCap uint, compressionAvailable bool, pool *sync.Pool) *bufferState {
func makeBlankBufferState(bufCap uint, compressionAvailable bool) *bufferState {
// Buffer of JSON encoded Splunk events, last record is expected to overflow bufCap, hence the padding
buf := bytes.NewBuffer(make([]byte, 0, bufCap+bufCapPadding))

Expand All @@ -336,7 +329,6 @@ func makeBlankBufferState(bufCap uint, compressionAvailable bool, pool *sync.Poo
bufFront: nil, // Index of the log record of the first unsent event in buffer.
resource: 0, // Index of currently processed Resource
library: 0, // Index of currently processed Library
gzipWriterPool: pool,
}
}

Expand All @@ -346,8 +338,8 @@ func makeBlankBufferState(bufCap uint, compressionAvailable bool, pool *sync.Poo
// The input data may contain both logs and profiling data.
// They are batched separately and sent with different HTTP headers
func (c *client) pushLogDataInBatches(ctx context.Context, ld plog.Logs, send func(context.Context, *bufferState, map[string]string) error) error {
var bufState = makeBlankBufferState(c.config.MaxContentLengthLogs, !c.config.DisableCompression, c.gzipWriterPool)
var profilingBufState = makeBlankBufferState(c.config.MaxContentLengthLogs, !c.config.DisableCompression, c.gzipWriterPool)
var bufState = makeBlankBufferState(c.config.MaxContentLengthLogs, !c.config.DisableCompression)
var profilingBufState = makeBlankBufferState(c.config.MaxContentLengthLogs, !c.config.DisableCompression)
var permanentErrors []error

var rls = ld.ResourceLogs()
Expand Down Expand Up @@ -602,7 +594,7 @@ func (c *client) pushTracesData(ctx context.Context, tds ptrace.ResourceSpansSli
// The batch content length is restricted to MaxContentLengthMetrics.
// md metrics are parsed to Splunk events.
func (c *client) pushMetricsDataInBatches(ctx context.Context, md pmetric.Metrics, send func(context.Context, *bufferState) error) error {
var bufState = makeBlankBufferState(c.config.MaxContentLengthMetrics, !c.config.DisableCompression, c.gzipWriterPool)
var bufState = makeBlankBufferState(c.config.MaxContentLengthMetrics, !c.config.DisableCompression)
var permanentErrors []error

var rms = md.ResourceMetrics()
Expand Down Expand Up @@ -637,7 +629,7 @@ func (c *client) pushMetricsDataInBatches(ctx context.Context, md pmetric.Metric
// The batch content length is restricted to MaxContentLengthMetrics.
// td traces are parsed to Splunk events.
func (c *client) pushTracesDataInBatches(ctx context.Context, td ptrace.Traces, send func(context.Context, *bufferState) error) error {
bufState := makeBlankBufferState(c.config.MaxContentLengthTraces, !c.config.DisableCompression, c.gzipWriterPool)
bufState := makeBlankBufferState(c.config.MaxContentLengthTraces, !c.config.DisableCompression)
var permanentErrors []error

var rts = td.ResourceSpans()
Expand Down
40 changes: 3 additions & 37 deletions exporter/splunkhecexporter/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"regexp"
"sort"
"strings"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -1023,9 +1022,6 @@ func Test_pushLogData_nil_Logs(t *testing.T) {
c := client{
config: NewFactory().CreateDefaultConfig().(*Config),
logger: zaptest.NewLogger(t),
gzipWriterPool: &sync.Pool{New: func() interface{} {
return gzip.NewWriter(nil)
}},
}

for _, test := range tests {
Expand Down Expand Up @@ -1061,9 +1057,6 @@ func Test_pushLogData_PostError(t *testing.T) {
url: &url.URL{Host: "in va lid"},
config: NewFactory().CreateDefaultConfig().(*Config),
logger: zaptest.NewLogger(t),
gzipWriterPool: &sync.Pool{New: func() interface{} {
return gzip.NewWriter(nil)
}},
}

// 2000 log records -> ~371888 bytes when JSON encoded.
Expand Down Expand Up @@ -1104,9 +1097,6 @@ func Test_pushLogData_ShouldAddResponseTo400Error(t *testing.T) {
url: &url.URL{Scheme: "http", Host: "splunk"},
config: NewFactory().CreateDefaultConfig().(*Config),
logger: zaptest.NewLogger(t),
gzipWriterPool: &sync.Pool{New: func() interface{} {
return gzip.NewWriter(nil)
}},
}
logs := createLogData(1, 1, 1)

Expand Down Expand Up @@ -1139,9 +1129,6 @@ func Test_pushLogData_ShouldReturnUnsentLogsOnly(t *testing.T) {
url: &url.URL{Scheme: "http", Host: "splunk"},
config: config,
logger: zaptest.NewLogger(t),
gzipWriterPool: &sync.Pool{New: func() interface{} {
return gzip.NewWriter(nil)
}},
}

// Just two records
Expand Down Expand Up @@ -1169,9 +1156,6 @@ func Test_pushLogData_ShouldAddHeadersForProfilingData(t *testing.T) {
url: &url.URL{Scheme: "http", Host: "splunk"},
config: NewFactory().CreateDefaultConfig().(*Config),
logger: zaptest.NewLogger(t),
gzipWriterPool: &sync.Pool{New: func() interface{} {
return gzip.NewWriter(nil)
}},
}

logs := createLogDataWithCustomLibraries(1, []string{"otel.logs", "otel.profiling"}, []int{10, 20})
Expand Down Expand Up @@ -1238,9 +1222,6 @@ func benchPushLogData(b *testing.B, numResources int, numProfiling int, numNonPr
url: &url.URL{Scheme: "http", Host: "splunk"},
config: NewFactory().CreateDefaultConfig().(*Config),
logger: zaptest.NewLogger(b),
gzipWriterPool: &sync.Pool{New: func() interface{} {
return gzip.NewWriter(nil)
}},
}

c.client, _ = newTestClient(200, "OK")
Expand All @@ -1261,9 +1242,6 @@ func Test_pushLogData_Small_MaxContentLength(t *testing.T) {
logger: zaptest.NewLogger(t),
url: &url.URL{Scheme: "http", Host: "splunk"},
client: http.DefaultClient,
gzipWriterPool: &sync.Pool{New: func() interface{} {
return gzip.NewWriter(nil)
}},
}
c.config.MaxContentLengthLogs = 1

Expand Down Expand Up @@ -1339,9 +1317,6 @@ func TestSubLogs(t *testing.T) {

c := client{
config: NewFactory().CreateDefaultConfig().(*Config),
gzipWriterPool: &sync.Pool{New: func() interface{} {
return gzip.NewWriter(nil)
}},
}

// Logs subset from leftmost index (resource 0, library 0, record 0).
Expand Down Expand Up @@ -1413,9 +1388,7 @@ func TestHecHealthCheckFailed(t *testing.T) {
url: &url.URL{Scheme: "http", Host: "splunk"},
config: NewFactory().CreateDefaultConfig().(*Config),
logger: zaptest.NewLogger(t),
gzipWriterPool: &sync.Pool{New: func() interface{} {
return gzip.NewWriter(nil)
}},

healthCheckURL: &url.URL{Scheme: "http", Host: "splunk", Path: "/services/collector/health"},
}
c.client, _ = newTestClient(503, "NOK")
Expand All @@ -1429,9 +1402,7 @@ func TestHecHealthCheckSucceded(t *testing.T) {
url: &url.URL{Scheme: "http", Host: "splunk"},
config: NewFactory().CreateDefaultConfig().(*Config),
logger: zaptest.NewLogger(t),
gzipWriterPool: &sync.Pool{New: func() interface{} {
return gzip.NewWriter(nil)
}},

healthCheckURL: &url.URL{Scheme: "http", Host: "splunk", Path: "/services/collector/health"},
}
c.client, _ = newTestClient(200, "OK")
Expand Down Expand Up @@ -1459,16 +1430,11 @@ func BenchmarkPushLogRecords(b *testing.B) {
url: &url.URL{Scheme: "http", Host: "splunk"},
config: NewFactory().CreateDefaultConfig().(*Config),
logger: zap.NewNop(),
gzipWriterPool: &sync.Pool{New: func() interface{} {
return gzip.NewWriter(nil)
}},
}
sender := func(ctx context.Context, state *bufferState, headers map[string]string) error {
return nil
}
state := makeBlankBufferState(4096, true, &sync.Pool{New: func() interface{} {
return gzip.NewWriter(nil)
}})
state := makeBlankBufferState(4096, true)
for n := 0; n < b.N; n++ {
permanentErrs, sendingErr := c.pushLogRecords(context.Background(), logs.ResourceLogs(), state, map[string]string{}, sender)
assert.NoError(b, sendingErr)
Expand Down
5 changes: 0 additions & 5 deletions exporter/splunkhecexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,12 @@
package splunkhecexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/splunkhecexporter"

import (
"compress/gzip"
"context"
"errors"
"fmt"
"net"
"net/http"
"net/url"
"sync"
"time"

"go.opentelemetry.io/collector/component"
Expand Down Expand Up @@ -134,8 +132,5 @@ func buildClient(options *exporterOptions, config *Config, logger *zap.Logger) (
"__splunk_app_version": config.SplunkAppVersion,
},
config: config,
gzipWriterPool: &sync.Pool{New: func() interface{} {
return gzip.NewWriter(nil)
}},
}, nil
}