Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make gzip default for otlp gRPC/HTTP exporters #4632

Merged
merged 9 commits into from
Feb 3, 2022
41 changes: 40 additions & 1 deletion config/configgrpc/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ configuration. For more information, see [configtls
README](../configtls/README.md).

- [`balancer_name`](https://github.com/grpc/grpc-go/blob/master/examples/features/load_balancing/README.md)
- `compression` Compression type to use among `gzip`, `snappy` and `zstd`
- `compression` Compression type to use among `gzip`, `snappy`, `zstd`, and `none`. Defaults to `gzip`. See compression comparison below for more details.
- `endpoint`: Valid value syntax available [here](https://github.com/grpc/grpc/blob/master/doc/naming.md)
- [`tls`](../configtls/README.md)
- `headers`: name/value pairs added to the request
Expand Down Expand Up @@ -44,6 +44,45 @@ exporters:
"test 2": "value 2"
```

### Compression Comparison
bogdandrutu marked this conversation as resolved.
Show resolved Hide resolved

[configgrpc_benchmark_test.go](./configgrpc_benchmark_test.go) contains benchmarks comparing the supported compression algorithms. It performs compression using `gzip`, `zstd`, and `snappy` compression on small, medium, and large sized log, trace, and metric payloads. Each test case outputs the uncompressed payload size, the compressed payload size, and the average nanoseconds spent on compression.

The following table summarizes the results, including some additional columns computed from the raw data. Compression ratios will vary in practice as they are highly dependent on the data's information entropy. Compression rates are dependent on the speed of the CPU, and the size of payloads being compressed: smaller payloads compress at slower rates relative to larger payloads, which are able to amortize fixed computation costs over more bytes.

The default `gzip` compression is not as fast as snappy, but achieves better compression ratios and has reasonable performance. It's also the only required compression algorithm for [OTLP servers](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/protocol/otlp.md#protocol-details). If your collector is CPU bound and your OTLP server supports it, you may benefit from using `snappy` compression. If your collector is CPU bound and has a very fast network link, you may benefit from disabling compression with `none`.

| Request | Compressor | Raw Bytes | Compressed bytes | Compression ratio | Ns / op | Mb compressed / second | Mb saved / second |
|-------------------|------------|-----------|------------------|-------------------|---------|------------------------|-------------------|
| lg_log_request | gzip | 5150 | 262 | 19.66 | 51371 | 100.25 | 95.15 |
| lg_metric_request | gzip | 6800 | 201 | 33.83 | 53983 | 125.97 | 122.24 |
| lg_trace_request | gzip | 9200 | 270 | 34.07 | 60681 | 151.61 | 147.16 |
| md_log_request | gzip | 363 | 268 | 1.35 | 40763 | 8.91 | 2.33 |
| md_metric_request | gzip | 320 | 145 | 2.21 | 36724 | 8.71 | 4.77 |
| md_trace_request | gzip | 451 | 288 | 1.57 | 41440 | 10.88 | 3.93 |
| sm_log_request | gzip | 166 | 168 | 0.99 | 39879 | 4.16 | -0.05 |
| sm_metric_request | gzip | 185 | 142 | 1.30 | 37546 | 4.93 | 1.15 |
| sm_trace_request | gzip | 233 | 205 | 1.14 | 36069 | 6.46 | 0.78 |
| lg_log_request | snappy | 5150 | 475 | 10.84 | 1494 | 3,447.12 | 3,129.18 |
| lg_metric_request | snappy | 6800 | 466 | 14.59 | 1738 | 3,912.54 | 3,644.42 |
| lg_trace_request | snappy | 9200 | 644 | 14.29 | 2510 | 3,665.34 | 3,408.76 |
| md_log_request | snappy | 363 | 300 | 1.21 | 615.5 | 589.76 | 102.36 |
| md_metric_request | snappy | 320 | 162 | 1.98 | 457.7 | 699.15 | 345.20 |
| md_trace_request | snappy | 451 | 330 | 1.37 | 698.7 | 645.48 | 173.18 |
| sm_log_request | snappy | 166 | 184 | 0.90 | 443.6 | 374.21 | -40.58 |
| sm_metric_request | snappy | 185 | 154 | 1.20 | 397.9 | 464.94 | 77.91 |
| sm_trace_request | snappy | 233 | 251 | 0.93 | 526.0 | 442.97 | -34.22 |
| lg_log_request | zstd | 5150 | 223 | 23.09 | 15810 | 325.74 | 311.64 |
| lg_metric_request | zstd | 6800 | 144 | 47.22 | 11134 | 610.74 | 597.81 |
| lg_trace_request | zstd | 9200 | 208 | 44.23 | 13696 | 671.73 | 656.54 |
| md_log_request | zstd | 363 | 261 | 1.39 | 10258 | 35.39 | 9.94 |
| md_metric_request | zstd | 320 | 145 | 2.21 | 8147 | 39.28 | 21.48 |
| md_trace_request | zstd | 451 | 301 | 1.50 | 11380 | 39.63 | 13.18 |
| sm_log_request | zstd | 166 | 165 | 1.01 | 11553 | 14.37 | 0.09 |
| sm_metric_request | zstd | 185 | 139 | 1.33 | 7645 | 24.20 | 6.02 |
| sm_trace_request | zstd | 233 | 203 | 1.15 | 9334 | 24.96 | 3.21 |


## Server Configuration

[Receivers](https://github.com/open-telemetry/opentelemetry-collector/blob/main/receiver/README.md)
Expand Down
14 changes: 9 additions & 5 deletions config/configgrpc/configgrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
// Compression gRPC keys for supported compression types within collector.
const (
CompressionUnsupported = ""
CompressionNone = "none"
CompressionGzip = "gzip"
CompressionSnappy = "snappy"
CompressionZstd = "zstd"
Expand Down Expand Up @@ -194,12 +195,15 @@ func (gcs *GRPCClientSettings) isSchemeHTTPS() bool {
// ToDialOptions maps configgrpc.GRPCClientSettings to a slice of dial options for gRPC.
func (gcs *GRPCClientSettings) ToDialOptions(host component.Host, settings component.TelemetrySettings) ([]grpc.DialOption, error) {
var opts []grpc.DialOption
if gcs.Compression != "" {
if compressionKey := GetGRPCCompressionKey(gcs.Compression); compressionKey != CompressionUnsupported {
opts = append(opts, grpc.WithDefaultCallOptions(grpc.UseCompressor(compressionKey)))
} else {
return nil, fmt.Errorf("unsupported compression type %q", gcs.Compression)
if strings.ToLower(gcs.Compression) != CompressionNone {
compressionKey := CompressionGzip
if gcs.Compression != "" {
compressionKey = GetGRPCCompressionKey(gcs.Compression)
if compressionKey == CompressionUnsupported {
return nil, fmt.Errorf("unsupported compression type %q", gcs.Compression)
}
}
opts = append(opts, grpc.WithDefaultCallOptions(grpc.UseCompressor(compressionKey)))
}

tlsCfg, err := gcs.TLSSetting.LoadTLSConfig()
Expand Down
181 changes: 181 additions & 0 deletions config/configgrpc/configgrpc_benchmark_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
/*
*
* Copyright 2014 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package configgrpc

import (
"bytes"
"fmt"
"testing"

"github.com/mostynb/go-grpc-compression/snappy"
"github.com/mostynb/go-grpc-compression/zstd"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/encoding"
"google.golang.org/grpc/encoding/gzip"
"google.golang.org/grpc/status"

"go.opentelemetry.io/collector/internal/testdata"
"go.opentelemetry.io/collector/model/otlp"
"go.opentelemetry.io/collector/model/pdata"
)

func BenchmarkCompressors(b *testing.B) {
payloads := setupTestPayloads()

compressors := make([]encoding.Compressor, 0)
compressors = append(compressors, encoding.GetCompressor(gzip.Name))
compressors = append(compressors, encoding.GetCompressor(zstd.Name))
compressors = append(compressors, encoding.GetCompressor(snappy.Name))

for _, payload := range payloads {
for _, compressor := range compressors {
fmt.Printf(payload.name)
messageBytes, err := payload.marshaler.marshal(payload.message)
if err != nil {
b.Errorf("marshal(_) returned an error")
}

compressedBytes, err := compress(compressor, messageBytes)
if err != nil {
b.Errorf("Compressor.Compress(_) returned an error")
}

name := fmt.Sprintf("%v/raw_bytes_%v/compressed_bytes_%v/compressor_%v", payload.name, len(messageBytes), len(compressedBytes), compressor.Name())

b.Run(name, func(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
if err != nil {
b.Errorf("marshal(_) returned an error")
}
_, err := compress(compressor, messageBytes)
if err != nil {
b.Errorf("compress(_) returned an error")
}
}
})
}
}
}

func compress(compressor encoding.Compressor, in []byte) ([]byte, error) {
if compressor == nil {
return nil, nil
}
wrapErr := func(err error) error {
return status.Errorf(codes.Internal, "error while compressing: %v", err.Error())
}
cbuf := &bytes.Buffer{}
z, err := compressor.Compress(cbuf)
if err != nil {
return nil, wrapErr(err)
}
if _, err := z.Write(in); err != nil {
return nil, wrapErr(err)
}
if err := z.Close(); err != nil {
return nil, wrapErr(err)
}
return cbuf.Bytes(), nil
}

type testPayload struct {
name string
message interface{}
marshaler marshaler
}

type marshaler interface {
marshal(interface{}) ([]byte, error)
}

type logMarshaler struct {
pdata.LogsMarshaler
}

func (m *logMarshaler) marshal(e interface{}) ([]byte, error) {
return m.MarshalLogs(e.(pdata.Logs))
}

type traceMarshaler struct {
pdata.TracesMarshaler
}

func (m *traceMarshaler) marshal(e interface{}) ([]byte, error) {
return m.MarshalTraces(e.(pdata.Traces))
}

type metricsMarshaler struct {
pdata.MetricsMarshaler
}

func (m *metricsMarshaler) marshal(e interface{}) ([]byte, error) {
return m.MarshalMetrics(e.(pdata.Metrics))
}

func setupTestPayloads() []testPayload {
payloads := make([]testPayload, 0)

// log payloads
logMarshaler := &logMarshaler{otlp.NewProtobufLogsMarshaler()}
payloads = append(payloads, testPayload{
name: "sm_log_request",
message: testdata.GenerateLogsOneLogRecord(),
marshaler: logMarshaler})
payloads = append(payloads, testPayload{
name: "md_log_request",
message: testdata.GenerateLogsTwoLogRecordsSameResourceOneDifferent(),
marshaler: logMarshaler})
payloads = append(payloads, testPayload{
name: "lg_log_request",
message: testdata.GenerateLogsManyLogRecordsSameResource(50),
marshaler: logMarshaler})

// trace payloads
tracesMarshaler := &traceMarshaler{otlp.NewProtobufTracesMarshaler()}
payloads = append(payloads, testPayload{
name: "sm_trace_request",
message: testdata.GenerateTracesOneSpan(),
marshaler: tracesMarshaler})
payloads = append(payloads, testPayload{
name: "md_trace_request",
message: testdata.GenerateTracesTwoSpansSameResourceOneDifferent(),
marshaler: tracesMarshaler})
payloads = append(payloads, testPayload{
name: "lg_trace_request",
message: testdata.GenerateTracesManySpansSameResource(50),
marshaler: tracesMarshaler})

// metric payloads
metricsMarshaler := &metricsMarshaler{otlp.NewProtobufMetricsMarshaler()}
payloads = append(payloads, testPayload{
name: "sm_metric_request",
message: testdata.GenerateMetricsOneMetric(),
marshaler: metricsMarshaler})
payloads = append(payloads, testPayload{
name: "md_metric_request",
message: testdata.GenerateMetricsTwoMetrics(),
marshaler: metricsMarshaler})
payloads = append(payloads, testPayload{
name: "lg_metric_request",
message: testdata.GenerateMetricsManyMetricsSameResource(50),
marshaler: metricsMarshaler})

return payloads
}
18 changes: 17 additions & 1 deletion config/configgrpc/configgrpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,22 @@ func TestDefaultGrpcClientSettings(t *testing.T) {
}
opts, err := gcs.ToDialOptions(componenttest.NewNopHost(), tt.TelemetrySettings)
assert.NoError(t, err)
assert.Len(t, opts, 4)
}

func TestNoneCompressionClientSettings(t *testing.T) {
tt, err := obsreporttest.SetupTelemetry()
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })

gcs := &GRPCClientSettings{
TLSSetting: configtls.TLSClientSetting{
Insecure: true,
},
Compression: "noNe",
}
opts, err := gcs.ToDialOptions(componenttest.NewNopHost(), tt.TelemetrySettings)
assert.NoError(t, err)
assert.Len(t, opts, 3)
}

Expand Down Expand Up @@ -267,7 +283,7 @@ func TestUseSecure(t *testing.T) {
}
dialOpts, err := gcs.ToDialOptions(componenttest.NewNopHost(), tt.TelemetrySettings)
assert.NoError(t, err)
assert.Len(t, dialOpts, 3)
assert.Len(t, dialOpts, 4)
}

func TestGRPCServerSettingsError(t *testing.T) {
Expand Down