From ae15d8c964bc9ff37a180c441ad44f4bbbac38f7 Mon Sep 17 00:00:00 2001 From: Tigran Najaryan <4194920+tigrannajaryan@users.noreply.github.com> Date: Thu, 15 Jun 2023 07:04:14 -0400 Subject: [PATCH] Add zstd compression to SAPM receiver and exporter (#23257) - Updated github.com/signalfx/sapm-proto to v0.13.0 - Added "compression" config setting to sapm exporter - Added tests to verify various compression settings for sapm receiver and exporter. --- .chloggen/sapm-exporter-zstd.yaml | 20 ++++ .chloggen/sapm-receiver-zstd.yaml | 20 ++++ exporter/sapmexporter/README.md | 5 + exporter/sapmexporter/config.go | 22 +++- exporter/sapmexporter/config_test.go | 6 + exporter/sapmexporter/exporter_test.go | 116 +++++++++++++++++++ exporter/sapmexporter/go.mod | 1 + receiver/sapmreceiver/go.mod | 2 +- receiver/sapmreceiver/trace_receiver_test.go | 114 ++++++++++++------ 9 files changed, 270 insertions(+), 36 deletions(-) create mode 100644 .chloggen/sapm-exporter-zstd.yaml create mode 100644 .chloggen/sapm-receiver-zstd.yaml diff --git a/.chloggen/sapm-exporter-zstd.yaml b/.chloggen/sapm-exporter-zstd.yaml new file mode 100644 index 000000000000..224950bfccdc --- /dev/null +++ b/.chloggen/sapm-exporter-zstd.yaml @@ -0,0 +1,20 @@ +# Use this changelog template to create an entry for release notes. +# If your change doesn't affect end users, such as a test fix or a tooling change, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: sapmexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: sapm exporter now supports `compression` config option to specify either gzip or zstd compression to use. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [23257] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: diff --git a/.chloggen/sapm-receiver-zstd.yaml b/.chloggen/sapm-receiver-zstd.yaml new file mode 100644 index 000000000000..06dd596fd126 --- /dev/null +++ b/.chloggen/sapm-receiver-zstd.yaml @@ -0,0 +1,20 @@ +# Use this changelog template to create an entry for release notes. +# If your change doesn't affect end users, such as a test fix or a tooling change, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: sapmreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: sapm receiver now accepts requests in compressed with zstd. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [23257] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: diff --git a/exporter/sapmexporter/README.md b/exporter/sapmexporter/README.md index 10a0ed981b0c..e394fdf2000b 100644 --- a/exporter/sapmexporter/README.md +++ b/exporter/sapmexporter/README.md @@ -48,6 +48,11 @@ during final translation. Intended to be used in tandem with identical configur - `timeout` (default = 5s): Is the timeout for every attempt to send data to the backend. - `log_detailed_response` (default = `false`): Option to log detailed response from Splunk APM. In addition to setting this option to `true`, debug logging at the Collector level needs to be enabled. +- `compression`: Compression method to use for outgoing SAPM requests. Can be one of + "gzip", "zstd" or be unspecified. If unspecified then "gzip" compression is used unless + `disable_compression` option is set to true. +- `disable_compression` (default = `false`): If set to true the outgoing requests are not + compressed and `compression` option is ignored. In addition, this exporter offers queued retry which is enabled by default. Information about queued retry configuration parameters can be found diff --git a/exporter/sapmexporter/config.go b/exporter/sapmexporter/config.go index d013706546bd..8f56221617b0 100644 --- a/exporter/sapmexporter/config.go +++ b/exporter/sapmexporter/config.go @@ -5,6 +5,7 @@ package sapmexporter // import "github.com/open-telemetry/opentelemetry-collecto import ( "errors" + "fmt" "net/url" sapmclient "github.com/signalfx/sapm-proto/client" @@ -35,9 +36,13 @@ type Config struct { // MaxConnections is used to set a limit to the maximum idle HTTP connection the exporter can keep open. MaxConnections uint `mapstructure:"max_connections"` - // Disable GZip compression. + // Disable compression. If set to true then Compression field is ignored. DisableCompression bool `mapstructure:"disable_compression"` + // Compression method to use (gzip or zstd). Ignored if DisableCompression=true. + // If unspecified defaults to gzip. + Compression string `mapstructure:"compression"` + // Log detailed response from trace ingest. LogDetailedResponse bool `mapstructure:"log_detailed_response"` @@ -56,6 +61,17 @@ func (c *Config) Validate() error { if err != nil { return err } + + switch c.Compression { + // Valid compression methods. + case "", // no compression + string(sapmclient.CompressionMethodGzip), + string(sapmclient.CompressionMethodZstd): + + default: + return fmt.Errorf("invalid compression %q", c.Compression) + } + return nil } @@ -85,5 +101,9 @@ func (c *Config) clientOptions() []sapmclient.Option { opts = append(opts, sapmclient.WithDisabledCompression()) } + if c.Compression != "" { + opts = append(opts, sapmclient.WithCompressionMethod(sapmclient.CompressionMethod(c.Compression))) + } + return opts } diff --git a/exporter/sapmexporter/config_test.go b/exporter/sapmexporter/config_test.go index b132b7f72d1a..a3c9de3d88ca 100644 --- a/exporter/sapmexporter/config_test.go +++ b/exporter/sapmexporter/config_test.go @@ -101,6 +101,12 @@ func TestInvalidConfig(t *testing.T) { invalidURLErr := invalid.Validate() require.Error(t, invalidURLErr) + invalid = Config{ + Endpoint: "http://localhost", + Compression: "nosuchcompression", + } + assert.Error(t, invalid.Validate()) + invalid = Config{ Endpoint: "abcd1234", QueueSettings: exporterhelper.QueueSettings{ diff --git a/exporter/sapmexporter/exporter_test.go b/exporter/sapmexporter/exporter_test.go index 6e402c70c523..f52079884c9f 100644 --- a/exporter/sapmexporter/exporter_test.go +++ b/exporter/sapmexporter/exporter_test.go @@ -4,14 +4,18 @@ package sapmexporter import ( + "compress/gzip" "context" "crypto/rand" "fmt" + "io" "net/http" "net/http/httptest" "testing" "github.com/jaegertracing/jaeger/model" + "github.com/klauspost/compress/zstd" + splunksapm "github.com/signalfx/sapm-proto/gen" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/exporter/exportertest" @@ -210,3 +214,115 @@ func TestSAPMClientTokenUsageAndErrorMarshalling(t *testing.T) { }) } } + +func decompress(body io.Reader, compression string) ([]byte, error) { + switch compression { + case "": + return io.ReadAll(body) + case "gzip": + reader, err := gzip.NewReader(body) + if err != nil { + return nil, err + } + return io.ReadAll(reader) + case "zstd": + reader, err := zstd.NewReader(body) + if err != nil { + return nil, err + } + return io.ReadAll(reader) + } + return nil, fmt.Errorf("unknown compression %q", compression) +} + +func TestCompression(t *testing.T) { + tests := []struct { + name string + configDisableCompression bool + configCompression string + receivedCompression string + }{ + { + name: "unspecified config", + configCompression: "", + configDisableCompression: false, + receivedCompression: "gzip", + }, + { + name: "gzip", + configCompression: "gzip", + configDisableCompression: false, + receivedCompression: "gzip", + }, + { + name: "zstd", + configCompression: "zstd", + configDisableCompression: false, + receivedCompression: "zstd", + }, + { + name: "disable compression and unspecified method", + configDisableCompression: true, + configCompression: "", + receivedCompression: "", + }, + { + name: "disable compression and specify gzip", + configDisableCompression: true, + configCompression: "gzip", + receivedCompression: "", + }, + { + name: "disable compression and specify zstd", + configDisableCompression: true, + configCompression: "zstd", + receivedCompression: "", + }, + } + for _, tt := range tests { + tt := tt + t.Run( + tt.name, func(t *testing.T) { + tracesReceived := false + server := httptest.NewServer( + http.HandlerFunc( + func(w http.ResponseWriter, r *http.Request) { + compression := r.Header.Get("Content-Encoding") + assert.EqualValues(t, compression, tt.receivedCompression) + + payload, err := decompress(r.Body, compression) + require.NoError(t, err) + + var sapm splunksapm.PostSpansRequest + err = sapm.Unmarshal(payload) + require.NoError(t, err) + + w.WriteHeader(200) + tracesReceived = true + }, + ), + ) + defer func() { + assert.True(t, tracesReceived, "Test server never received traces.") + }() + defer server.Close() + + cfg := &Config{ + Endpoint: server.URL, + DisableCompression: tt.configDisableCompression, + Compression: tt.configCompression, + } + params := exportertest.NewNopCreateSettings() + + se, err := newSAPMExporter(cfg, params) + assert.Nil(t, err) + assert.NotNil(t, se, "failed to create trace exporter") + + trace, testTraceErr := buildTestTrace() + require.NoError(t, testTraceErr) + err = se.pushTraceData(context.Background(), trace) + require.NoError(t, err) + }, + ) + } +} diff --git a/exporter/sapmexporter/go.mod b/exporter/sapmexporter/go.mod index 1d623a2d0e25..dc38f3507c49 100644 --- a/exporter/sapmexporter/go.mod +++ b/exporter/sapmexporter/go.mod @@ -5,6 +5,7 @@ go 1.19 require ( github.com/cenkalti/backoff/v4 v4.2.1 github.com/jaegertracing/jaeger v1.41.0 + github.com/klauspost/compress v1.16.5 github.com/open-telemetry/opentelemetry-collector-contrib/internal/splunk v0.79.0 github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchperresourceattr v0.79.0 github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger v0.79.0 diff --git a/receiver/sapmreceiver/go.mod b/receiver/sapmreceiver/go.mod index 7c96bccc58d6..c1409d05df17 100644 --- a/receiver/sapmreceiver/go.mod +++ b/receiver/sapmreceiver/go.mod @@ -5,6 +5,7 @@ go 1.19 require ( github.com/gorilla/mux v1.8.0 github.com/jaegertracing/jaeger v1.41.0 + github.com/klauspost/compress v1.16.5 github.com/open-telemetry/opentelemetry-collector-contrib/internal/common v0.79.0 github.com/open-telemetry/opentelemetry-collector-contrib/internal/splunk v0.79.0 github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger v0.79.0 @@ -31,7 +32,6 @@ require ( github.com/golang/protobuf v1.5.3 // indirect github.com/golang/snappy v0.0.4 // indirect github.com/json-iterator/go v1.1.12 // indirect - github.com/klauspost/compress v1.16.5 // indirect github.com/knadh/koanf v1.5.0 // indirect github.com/mitchellh/copystructure v1.2.0 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect diff --git a/receiver/sapmreceiver/trace_receiver_test.go b/receiver/sapmreceiver/trace_receiver_test.go index 5390bd3ee624..3c14e0af8683 100644 --- a/receiver/sapmreceiver/trace_receiver_test.go +++ b/receiver/sapmreceiver/trace_receiver_test.go @@ -14,6 +14,7 @@ import ( "time" "github.com/jaegertracing/jaeger/model" + "github.com/klauspost/compress/zstd" splunksapm "github.com/signalfx/sapm-proto/gen" "github.com/signalfx/sapm-proto/sapmprotocol" "github.com/stretchr/testify/assert" @@ -121,32 +122,34 @@ func grpcFixture(t1 time.Time) *model.Batch { } // sendSapm acts as a client for sending sapm to the receiver. This could be replaced with a sapm exporter in the future. -func sendSapm(endpoint string, sapm *splunksapm.PostSpansRequest, zipped bool, tlsEnabled bool, token string) (*http.Response, error) { +func sendSapm( + endpoint string, + sapm *splunksapm.PostSpansRequest, + compression string, + tlsEnabled bool, + token string, +) (*http.Response, error) { // marshal the sapm reqBytes, err := sapm.Marshal() if err != nil { return nil, fmt.Errorf("failed to marshal sapm %w", err) } - if zipped { - // create a gzip writer - var buff bytes.Buffer - writer := gzip.NewWriter(&buff) - - // run the request bytes through the gzip writer - _, err = writer.Write(reqBytes) + switch compression { + case "gzip": + reqBytes, err = compressGzip(reqBytes) if err != nil { - return nil, fmt.Errorf("failed to write gzip sapm %w", err) + return nil, err } - - // close the writer - err = writer.Close() + case "zstd": + reqBytes, err = compressZstd(reqBytes) if err != nil { - return nil, fmt.Errorf("failed to close the gzip writer %w", err) + return nil, err } - - // save the gzipped bytes as the request bytes - reqBytes = buff.Bytes() + case "": + // no compression + default: + return nil, fmt.Errorf("unknown compression %q", compression) } // build the request @@ -158,8 +161,8 @@ func sendSapm(endpoint string, sapm *splunksapm.PostSpansRequest, zipped bool, t req.Header.Set(sapmprotocol.ContentTypeHeaderName, sapmprotocol.ContentTypeHeaderValue) // set headers for gzip - if zipped { - req.Header.Set(sapmprotocol.ContentEncodingHeaderName, sapmprotocol.GZipEncodingHeaderValue) + if compression != "" { + req.Header.Set(sapmprotocol.ContentEncodingHeaderName, compression) req.Header.Set(sapmprotocol.AcceptEncodingHeaderName, sapmprotocol.GZipEncodingHeaderValue) } @@ -196,6 +199,49 @@ func sendSapm(endpoint string, sapm *splunksapm.PostSpansRequest, zipped bool, t return resp, nil } +func compressGzip(reqBytes []byte) ([]byte, error) { + // create a gzip writer + var buff bytes.Buffer + writer := gzip.NewWriter(&buff) + + // run the request bytes through the gzip writer + _, err := writer.Write(reqBytes) + if err != nil { + return nil, fmt.Errorf("failed to write gzip sapm %w", err) + } + + // close the writer + err = writer.Close() + if err != nil { + return nil, fmt.Errorf("failed to close the gzip writer %w", err) + } + + return buff.Bytes(), nil +} + +func compressZstd(reqBytes []byte) ([]byte, error) { + // create a gzip writer + var buff bytes.Buffer + writer, err := zstd.NewWriter(&buff) + if err != nil { + return nil, fmt.Errorf("failed to write zstd sapm %w", err) + } + + // run the request bytes through the gzip writer + _, err = writer.Write(reqBytes) + if err != nil { + return nil, fmt.Errorf("failed to write zstd sapm %w", err) + } + + // close the writer + err = writer.Close() + if err != nil { + return nil, fmt.Errorf("failed to close the zstd writer %w", err) + } + + return buff.Bytes(), nil +} + func setupReceiver(t *testing.T, config *Config, sink *consumertest.TracesSink) receiver.Traces { params := receivertest.NewNopCreateSettings() sr, err := newReceiver(params, config, sink) @@ -219,10 +265,10 @@ func TestReception(t *testing.T) { tlsAddress := testutil.GetAvailableLocalAddress(t) type args struct { - config *Config - sapm *splunksapm.PostSpansRequest - zipped bool - useTLS bool + config *Config + sapm *splunksapm.PostSpansRequest + compression string + useTLS bool } tests := []struct { name string @@ -238,9 +284,9 @@ func TestReception(t *testing.T) { Endpoint: defaultEndpoint, }, }, - sapm: &splunksapm.PostSpansRequest{Batches: []*model.Batch{grpcFixture(now)}}, - zipped: false, - useTLS: false, + sapm: &splunksapm.PostSpansRequest{Batches: []*model.Batch{grpcFixture(now)}}, + compression: "", + useTLS: false, }, want: expectedTraceData(now, nowPlus10min, nowPlus10min2sec), }, @@ -252,14 +298,14 @@ func TestReception(t *testing.T) { Endpoint: defaultEndpoint, }, }, - sapm: &splunksapm.PostSpansRequest{Batches: []*model.Batch{grpcFixture(now)}}, - zipped: true, - useTLS: false, + sapm: &splunksapm.PostSpansRequest{Batches: []*model.Batch{grpcFixture(now)}}, + compression: "gzip", + useTLS: false, }, want: expectedTraceData(now, nowPlus10min, nowPlus10min2sec), }, { - name: "connect via TLS compressed sapm", + name: "connect via TLS zstd compressed sapm", args: args{ config: &Config{ HTTPServerSettings: confighttp.HTTPServerSettings{ @@ -273,9 +319,9 @@ func TestReception(t *testing.T) { }, }, }, - sapm: &splunksapm.PostSpansRequest{Batches: []*model.Batch{grpcFixture(now)}}, - zipped: false, - useTLS: true, + sapm: &splunksapm.PostSpansRequest{Batches: []*model.Batch{grpcFixture(now)}}, + compression: "zstd", + useTLS: true, }, want: expectedTraceData(now, nowPlus10min, nowPlus10min2sec), }, @@ -291,7 +337,7 @@ func TestReception(t *testing.T) { t.Log("Sending Sapm Request") var resp *http.Response - resp, err := sendSapm(tt.args.config.Endpoint, tt.args.sapm, tt.args.zipped, tt.args.useTLS, "") + resp, err := sendSapm(tt.args.config.Endpoint, tt.args.sapm, tt.args.compression, tt.args.useTLS, "") require.NoError(t, err) assert.Equal(t, 200, resp.StatusCode) t.Log("SAPM Request Received") @@ -356,7 +402,7 @@ func TestAccessTokenPassthrough(t *testing.T) { }() var resp *http.Response - resp, err := sendSapm(config.Endpoint, sapm, true, false, tt.token) + resp, err := sendSapm(config.Endpoint, sapm, "gzip", false, tt.token) require.NoErrorf(t, err, "should not have failed when sending sapm %v", err) assert.Equal(t, 200, resp.StatusCode)