Skip to content

Commit

Permalink
Add zstd compression to SAPM receiver and exporter (open-telemetry#23257
Browse files Browse the repository at this point in the history
)

- Updated github.com/signalfx/sapm-proto to v0.13.0
- Added "compression" config setting to sapm exporter
- Added tests to verify various compression settings for sapm receiver
and exporter.
tigrannajaryan authored Jun 15, 2023

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
1 parent c2658a7 commit ae15d8c
Showing 9 changed files with 270 additions and 36 deletions.
20 changes: 20 additions & 0 deletions .chloggen/sapm-exporter-zstd.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Use this changelog template to create an entry for release notes.
# If your change doesn't affect end users, such as a test fix or a tooling change,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: sapmexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: sapm exporter now supports `compression` config option to specify either gzip or zstd compression to use.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [23257]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
20 changes: 20 additions & 0 deletions .chloggen/sapm-receiver-zstd.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Use this changelog template to create an entry for release notes.
# If your change doesn't affect end users, such as a test fix or a tooling change,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: sapmreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: sapm receiver now accepts requests in compressed with zstd.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [23257]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
5 changes: 5 additions & 0 deletions exporter/sapmexporter/README.md
Original file line number Diff line number Diff line change
@@ -48,6 +48,11 @@ during final translation. Intended to be used in tandem with identical configur
- `timeout` (default = 5s): Is the timeout for every attempt to send data to the backend.
- `log_detailed_response` (default = `false`): Option to log detailed response from Splunk APM.
In addition to setting this option to `true`, debug logging at the Collector level needs to be enabled.
- `compression`: Compression method to use for outgoing SAPM requests. Can be one of
"gzip", "zstd" or be unspecified. If unspecified then "gzip" compression is used unless
`disable_compression` option is set to true.
- `disable_compression` (default = `false`): If set to true the outgoing requests are not
compressed and `compression` option is ignored.

In addition, this exporter offers queued retry which is enabled by default.
Information about queued retry configuration parameters can be found
22 changes: 21 additions & 1 deletion exporter/sapmexporter/config.go
Original file line number Diff line number Diff line change
@@ -5,6 +5,7 @@ package sapmexporter // import "github.com/open-telemetry/opentelemetry-collecto

import (
"errors"
"fmt"
"net/url"

sapmclient "github.com/signalfx/sapm-proto/client"
@@ -35,9 +36,13 @@ type Config struct {
// MaxConnections is used to set a limit to the maximum idle HTTP connection the exporter can keep open.
MaxConnections uint `mapstructure:"max_connections"`

// Disable GZip compression.
// Disable compression. If set to true then Compression field is ignored.
DisableCompression bool `mapstructure:"disable_compression"`

// Compression method to use (gzip or zstd). Ignored if DisableCompression=true.
// If unspecified defaults to gzip.
Compression string `mapstructure:"compression"`

// Log detailed response from trace ingest.
LogDetailedResponse bool `mapstructure:"log_detailed_response"`

@@ -56,6 +61,17 @@ func (c *Config) Validate() error {
if err != nil {
return err
}

switch c.Compression {
// Valid compression methods.
case "", // no compression
string(sapmclient.CompressionMethodGzip),
string(sapmclient.CompressionMethodZstd):

default:
return fmt.Errorf("invalid compression %q", c.Compression)
}

return nil
}

@@ -85,5 +101,9 @@ func (c *Config) clientOptions() []sapmclient.Option {
opts = append(opts, sapmclient.WithDisabledCompression())
}

if c.Compression != "" {
opts = append(opts, sapmclient.WithCompressionMethod(sapmclient.CompressionMethod(c.Compression)))
}

return opts
}
6 changes: 6 additions & 0 deletions exporter/sapmexporter/config_test.go
Original file line number Diff line number Diff line change
@@ -101,6 +101,12 @@ func TestInvalidConfig(t *testing.T) {
invalidURLErr := invalid.Validate()
require.Error(t, invalidURLErr)

invalid = Config{
Endpoint: "http://localhost",
Compression: "nosuchcompression",
}
assert.Error(t, invalid.Validate())

invalid = Config{
Endpoint: "abcd1234",
QueueSettings: exporterhelper.QueueSettings{
116 changes: 116 additions & 0 deletions exporter/sapmexporter/exporter_test.go
Original file line number Diff line number Diff line change
@@ -4,14 +4,18 @@
package sapmexporter

import (
"compress/gzip"
"context"
"crypto/rand"
"fmt"
"io"
"net/http"
"net/http/httptest"
"testing"

"github.com/jaegertracing/jaeger/model"
"github.com/klauspost/compress/zstd"
splunksapm "github.com/signalfx/sapm-proto/gen"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/exporter/exportertest"
@@ -210,3 +214,115 @@ func TestSAPMClientTokenUsageAndErrorMarshalling(t *testing.T) {
})
}
}

func decompress(body io.Reader, compression string) ([]byte, error) {
switch compression {
case "":
return io.ReadAll(body)
case "gzip":
reader, err := gzip.NewReader(body)
if err != nil {
return nil, err
}
return io.ReadAll(reader)
case "zstd":
reader, err := zstd.NewReader(body)
if err != nil {
return nil, err
}
return io.ReadAll(reader)
}
return nil, fmt.Errorf("unknown compression %q", compression)
}

func TestCompression(t *testing.T) {
tests := []struct {
name string
configDisableCompression bool
configCompression string
receivedCompression string
}{
{
name: "unspecified config",
configCompression: "",
configDisableCompression: false,
receivedCompression: "gzip",
},
{
name: "gzip",
configCompression: "gzip",
configDisableCompression: false,
receivedCompression: "gzip",
},
{
name: "zstd",
configCompression: "zstd",
configDisableCompression: false,
receivedCompression: "zstd",
},
{
name: "disable compression and unspecified method",
configDisableCompression: true,
configCompression: "",
receivedCompression: "",
},
{
name: "disable compression and specify gzip",
configDisableCompression: true,
configCompression: "gzip",
receivedCompression: "",
},
{
name: "disable compression and specify zstd",
configDisableCompression: true,
configCompression: "zstd",
receivedCompression: "",
},
}
for _, tt := range tests {
tt := tt
t.Run(
tt.name, func(t *testing.T) {
tracesReceived := false
server := httptest.NewServer(
http.HandlerFunc(
func(w http.ResponseWriter, r *http.Request) {
compression := r.Header.Get("Content-Encoding")
assert.EqualValues(t, compression, tt.receivedCompression)

payload, err := decompress(r.Body, compression)
require.NoError(t, err)

var sapm splunksapm.PostSpansRequest
err = sapm.Unmarshal(payload)
require.NoError(t, err)

w.WriteHeader(200)
tracesReceived = true
},
),
)
defer func() {
assert.True(t, tracesReceived, "Test server never received traces.")
}()
defer server.Close()

cfg := &Config{
Endpoint: server.URL,
DisableCompression: tt.configDisableCompression,
Compression: tt.configCompression,
}
params := exportertest.NewNopCreateSettings()

se, err := newSAPMExporter(cfg, params)
assert.Nil(t, err)
assert.NotNil(t, se, "failed to create trace exporter")

trace, testTraceErr := buildTestTrace()
require.NoError(t, testTraceErr)
err = se.pushTraceData(context.Background(), trace)
require.NoError(t, err)
},
)
}
}
1 change: 1 addition & 0 deletions exporter/sapmexporter/go.mod
Original file line number Diff line number Diff line change
@@ -5,6 +5,7 @@ go 1.19
require (
github.com/cenkalti/backoff/v4 v4.2.1
github.com/jaegertracing/jaeger v1.41.0
github.com/klauspost/compress v1.16.5
github.com/open-telemetry/opentelemetry-collector-contrib/internal/splunk v0.79.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchperresourceattr v0.79.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger v0.79.0
2 changes: 1 addition & 1 deletion receiver/sapmreceiver/go.mod
Original file line number Diff line number Diff line change
@@ -5,6 +5,7 @@ go 1.19
require (
github.com/gorilla/mux v1.8.0
github.com/jaegertracing/jaeger v1.41.0
github.com/klauspost/compress v1.16.5
github.com/open-telemetry/opentelemetry-collector-contrib/internal/common v0.79.0
github.com/open-telemetry/opentelemetry-collector-contrib/internal/splunk v0.79.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger v0.79.0
@@ -31,7 +32,6 @@ require (
github.com/golang/protobuf v1.5.3 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.16.5 // indirect
github.com/knadh/koanf v1.5.0 // indirect
github.com/mitchellh/copystructure v1.2.0 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
114 changes: 80 additions & 34 deletions receiver/sapmreceiver/trace_receiver_test.go
Original file line number Diff line number Diff line change
@@ -14,6 +14,7 @@ import (
"time"

"github.com/jaegertracing/jaeger/model"
"github.com/klauspost/compress/zstd"
splunksapm "github.com/signalfx/sapm-proto/gen"
"github.com/signalfx/sapm-proto/sapmprotocol"
"github.com/stretchr/testify/assert"
@@ -121,32 +122,34 @@ func grpcFixture(t1 time.Time) *model.Batch {
}

// sendSapm acts as a client for sending sapm to the receiver. This could be replaced with a sapm exporter in the future.
func sendSapm(endpoint string, sapm *splunksapm.PostSpansRequest, zipped bool, tlsEnabled bool, token string) (*http.Response, error) {
func sendSapm(
endpoint string,
sapm *splunksapm.PostSpansRequest,
compression string,
tlsEnabled bool,
token string,
) (*http.Response, error) {
// marshal the sapm
reqBytes, err := sapm.Marshal()
if err != nil {
return nil, fmt.Errorf("failed to marshal sapm %w", err)
}

if zipped {
// create a gzip writer
var buff bytes.Buffer
writer := gzip.NewWriter(&buff)

// run the request bytes through the gzip writer
_, err = writer.Write(reqBytes)
switch compression {
case "gzip":
reqBytes, err = compressGzip(reqBytes)
if err != nil {
return nil, fmt.Errorf("failed to write gzip sapm %w", err)
return nil, err
}

// close the writer
err = writer.Close()
case "zstd":
reqBytes, err = compressZstd(reqBytes)
if err != nil {
return nil, fmt.Errorf("failed to close the gzip writer %w", err)
return nil, err
}

// save the gzipped bytes as the request bytes
reqBytes = buff.Bytes()
case "":
// no compression
default:
return nil, fmt.Errorf("unknown compression %q", compression)
}

// build the request
@@ -158,8 +161,8 @@ func sendSapm(endpoint string, sapm *splunksapm.PostSpansRequest, zipped bool, t
req.Header.Set(sapmprotocol.ContentTypeHeaderName, sapmprotocol.ContentTypeHeaderValue)

// set headers for gzip
if zipped {
req.Header.Set(sapmprotocol.ContentEncodingHeaderName, sapmprotocol.GZipEncodingHeaderValue)
if compression != "" {
req.Header.Set(sapmprotocol.ContentEncodingHeaderName, compression)
req.Header.Set(sapmprotocol.AcceptEncodingHeaderName, sapmprotocol.GZipEncodingHeaderValue)
}

@@ -196,6 +199,49 @@ func sendSapm(endpoint string, sapm *splunksapm.PostSpansRequest, zipped bool, t
return resp, nil
}

func compressGzip(reqBytes []byte) ([]byte, error) {
// create a gzip writer
var buff bytes.Buffer
writer := gzip.NewWriter(&buff)

// run the request bytes through the gzip writer
_, err := writer.Write(reqBytes)
if err != nil {
return nil, fmt.Errorf("failed to write gzip sapm %w", err)
}

// close the writer
err = writer.Close()
if err != nil {
return nil, fmt.Errorf("failed to close the gzip writer %w", err)
}

return buff.Bytes(), nil
}

func compressZstd(reqBytes []byte) ([]byte, error) {
// create a gzip writer
var buff bytes.Buffer
writer, err := zstd.NewWriter(&buff)
if err != nil {
return nil, fmt.Errorf("failed to write zstd sapm %w", err)
}

// run the request bytes through the gzip writer
_, err = writer.Write(reqBytes)
if err != nil {
return nil, fmt.Errorf("failed to write zstd sapm %w", err)
}

// close the writer
err = writer.Close()
if err != nil {
return nil, fmt.Errorf("failed to close the zstd writer %w", err)
}

return buff.Bytes(), nil
}

func setupReceiver(t *testing.T, config *Config, sink *consumertest.TracesSink) receiver.Traces {
params := receivertest.NewNopCreateSettings()
sr, err := newReceiver(params, config, sink)
@@ -219,10 +265,10 @@ func TestReception(t *testing.T) {
tlsAddress := testutil.GetAvailableLocalAddress(t)

type args struct {
config *Config
sapm *splunksapm.PostSpansRequest
zipped bool
useTLS bool
config *Config
sapm *splunksapm.PostSpansRequest
compression string
useTLS bool
}
tests := []struct {
name string
@@ -238,9 +284,9 @@ func TestReception(t *testing.T) {
Endpoint: defaultEndpoint,
},
},
sapm: &splunksapm.PostSpansRequest{Batches: []*model.Batch{grpcFixture(now)}},
zipped: false,
useTLS: false,
sapm: &splunksapm.PostSpansRequest{Batches: []*model.Batch{grpcFixture(now)}},
compression: "",
useTLS: false,
},
want: expectedTraceData(now, nowPlus10min, nowPlus10min2sec),
},
@@ -252,14 +298,14 @@ func TestReception(t *testing.T) {
Endpoint: defaultEndpoint,
},
},
sapm: &splunksapm.PostSpansRequest{Batches: []*model.Batch{grpcFixture(now)}},
zipped: true,
useTLS: false,
sapm: &splunksapm.PostSpansRequest{Batches: []*model.Batch{grpcFixture(now)}},
compression: "gzip",
useTLS: false,
},
want: expectedTraceData(now, nowPlus10min, nowPlus10min2sec),
},
{
name: "connect via TLS compressed sapm",
name: "connect via TLS zstd compressed sapm",
args: args{
config: &Config{
HTTPServerSettings: confighttp.HTTPServerSettings{
@@ -273,9 +319,9 @@ func TestReception(t *testing.T) {
},
},
},
sapm: &splunksapm.PostSpansRequest{Batches: []*model.Batch{grpcFixture(now)}},
zipped: false,
useTLS: true,
sapm: &splunksapm.PostSpansRequest{Batches: []*model.Batch{grpcFixture(now)}},
compression: "zstd",
useTLS: true,
},
want: expectedTraceData(now, nowPlus10min, nowPlus10min2sec),
},
@@ -291,7 +337,7 @@ func TestReception(t *testing.T) {

t.Log("Sending Sapm Request")
var resp *http.Response
resp, err := sendSapm(tt.args.config.Endpoint, tt.args.sapm, tt.args.zipped, tt.args.useTLS, "")
resp, err := sendSapm(tt.args.config.Endpoint, tt.args.sapm, tt.args.compression, tt.args.useTLS, "")
require.NoError(t, err)
assert.Equal(t, 200, resp.StatusCode)
t.Log("SAPM Request Received")
@@ -356,7 +402,7 @@ func TestAccessTokenPassthrough(t *testing.T) {
}()

var resp *http.Response
resp, err := sendSapm(config.Endpoint, sapm, true, false, tt.token)
resp, err := sendSapm(config.Endpoint, sapm, "gzip", false, tt.token)
require.NoErrorf(t, err, "should not have failed when sending sapm %v", err)
assert.Equal(t, 200, resp.StatusCode)

0 comments on commit ae15d8c

Please sign in to comment.