Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[confighttp] Allow compression list for a server to be overridden #10295

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: 'enhancement'

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: confighttp

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Allow the compression list to be overridden

# One or more tracking issues or pull requests related to the change
issues: [10295]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: Allows Collector administrators to control which compression algorithms to enable for HTTP-based receivers.
2 changes: 2 additions & 0 deletions config/confighttp/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ will not be enabled.
not set, browsers use a default of 5 seconds.
- `endpoint`: Valid value syntax available [here](https://github.com/grpc/grpc/blob/master/doc/naming.md)
- `max_request_body_size`: configures the maximum allowed body size in bytes for a single request. Default: `0` (no restriction)
- `compression_algorithms`: configures the list of compression algorithms the server can accept. Default: ["", "gzip", "zstd", "zlib", "snappy", "deflate"]
- [`tls`](../configtls/README.md)
- [`auth`](../configauth/README.md)

Expand All @@ -98,6 +99,7 @@ receivers:
- Example-Header
max_age: 7200
endpoint: 0.0.0.0:55690
compression_algorithms: ["", "gzip"]
processors:
attributes:
actions:
Expand Down
106 changes: 58 additions & 48 deletions config/confighttp/compression.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,53 @@
compressor *compressor
}

var availableDecoders = map[string]func(body io.ReadCloser) (io.ReadCloser, error){
"": func(io.ReadCloser) (io.ReadCloser, error) {
// Not a compressed payload. Nothing to do.
return nil, nil
},
"gzip": func(body io.ReadCloser) (io.ReadCloser, error) {
gr, err := gzip.NewReader(body)
if err != nil {
return nil, err
}
return gr, nil
},
"zstd": func(body io.ReadCloser) (io.ReadCloser, error) {
zr, err := zstd.NewReader(
body,
// Concurrency 1 disables async decoding. We don't need async decoding, it is pointless
// for our use-case (a server accepting decoding http requests).
// Disabling async improves performance (I benchmarked it previously when working
// on https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/23257).
zstd.WithDecoderConcurrency(1),
)
if err != nil {
return nil, err

Check warning on line 50 in config/confighttp/compression.go

View check run for this annotation

Codecov / codecov/patch

config/confighttp/compression.go#L50

Added line #L50 was not covered by tests
}
return zr.IOReadCloser(), nil
},
"zlib": func(body io.ReadCloser) (io.ReadCloser, error) {
zr, err := zlib.NewReader(body)
if err != nil {
return nil, err
}
return zr, nil
},
"snappy": func(body io.ReadCloser) (io.ReadCloser, error) {
sr := snappy.NewReader(body)
sb := new(bytes.Buffer)
_, err := io.Copy(sb, sr)
if err != nil {
return nil, err
}
if err = body.Close(); err != nil {
return nil, err

Check warning on line 69 in config/confighttp/compression.go

View check run for this annotation

Codecov / codecov/patch

config/confighttp/compression.go#L69

Added line #L69 was not covered by tests
}
return io.NopCloser(sb), nil
},
}

func newCompressRoundTripper(rt http.RoundTripper, compressionType configcompression.Type) (*compressRoundTripper, error) {
encoder, err := newCompressor(compressionType)
if err != nil {
Expand Down Expand Up @@ -77,64 +124,27 @@
// by identifying the compression format in the "Content-Encoding" header and re-writing
// request body so that the handlers further in the chain can work on decompressed data.
// It supports gzip and deflate/zlib compression.
func httpContentDecompressor(h http.Handler, maxRequestBodySize int64, eh func(w http.ResponseWriter, r *http.Request, errorMsg string, statusCode int), decoders map[string]func(body io.ReadCloser) (io.ReadCloser, error)) http.Handler {
func httpContentDecompressor(h http.Handler, maxRequestBodySize int64, eh func(w http.ResponseWriter, r *http.Request, errorMsg string, statusCode int), enableDecoders []string, decoders map[string]func(body io.ReadCloser) (io.ReadCloser, error)) http.Handler {
errHandler := defaultErrorHandler
if eh != nil {
errHandler = eh
}

enabled := map[string]func(body io.ReadCloser) (io.ReadCloser, error){}
for _, dec := range enableDecoders {
enabled[dec] = availableDecoders[dec]

if dec == "deflate" {
enabled["deflate"] = availableDecoders["zlib"]
}
}

d := &decompressor{
maxRequestBodySize: maxRequestBodySize,
errHandler: errHandler,
base: h,
decoders: map[string]func(body io.ReadCloser) (io.ReadCloser, error){
"": func(io.ReadCloser) (io.ReadCloser, error) {
// Not a compressed payload. Nothing to do.
return nil, nil
},
"gzip": func(body io.ReadCloser) (io.ReadCloser, error) {
gr, err := gzip.NewReader(body)
if err != nil {
return nil, err
}
return gr, nil
},
"zstd": func(body io.ReadCloser) (io.ReadCloser, error) {
zr, err := zstd.NewReader(
body,
// Concurrency 1 disables async decoding. We don't need async decoding, it is pointless
// for our use-case (a server accepting decoding http requests).
// Disabling async improves performance (I benchmarked it previously when working
// on https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/23257).
zstd.WithDecoderConcurrency(1),
)
if err != nil {
return nil, err
}
return zr.IOReadCloser(), nil
},
"zlib": func(body io.ReadCloser) (io.ReadCloser, error) {
zr, err := zlib.NewReader(body)
if err != nil {
return nil, err
}
return zr, nil
},
"snappy": func(body io.ReadCloser) (io.ReadCloser, error) {
sr := snappy.NewReader(body)
sb := new(bytes.Buffer)
_, err := io.Copy(sb, sr)
if err != nil {
return nil, err
}
if err = body.Close(); err != nil {
return nil, err
}
return io.NopCloser(sb), nil
},
},
decoders: enabled,
}
d.decoders["deflate"] = d.decoders["zlib"]

for key, dec := range decoders {
d.decoders[key] = dec
Expand Down
29 changes: 27 additions & 2 deletions config/confighttp/compression_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func TestHTTPCustomDecompression(t *testing.T) {
return io.NopCloser(strings.NewReader("decompressed body")), nil
},
}
srv := httptest.NewServer(httpContentDecompressor(handler, defaultMaxRequestBodySize, defaultErrorHandler, decoders))
srv := httptest.NewServer(httpContentDecompressor(handler, defaultMaxRequestBodySize, defaultErrorHandler, defaultCompressionAlgorithms, decoders))

t.Cleanup(srv.Close)

Expand Down Expand Up @@ -253,7 +253,7 @@ func TestHTTPContentDecompressionHandler(t *testing.T) {
require.NoError(t, err, "failed to read request body: %v", err)
assert.EqualValues(t, testBody, string(body))
w.WriteHeader(http.StatusOK)
}), defaultMaxRequestBodySize, defaultErrorHandler, noDecoders))
}), defaultMaxRequestBodySize, defaultErrorHandler, defaultCompressionAlgorithms, noDecoders))
t.Cleanup(srv.Close)

req, err := http.NewRequest(http.MethodGet, srv.URL, tt.reqBody)
Expand Down Expand Up @@ -349,6 +349,31 @@ func TestHTTPContentCompressionRequestBodyCloseError(t *testing.T) {
require.Error(t, err)
}

func TestOverrideCompressionList(t *testing.T) {
// prepare
configuredDecoders := []string{"none", "zlib"}

srv := httptest.NewServer(httpContentDecompressor(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusOK)
}), defaultMaxRequestBodySize, defaultErrorHandler, configuredDecoders, nil))
t.Cleanup(srv.Close)

req, err := http.NewRequest(http.MethodGet, srv.URL, compressSnappy(t, []byte("123decompressed body")))
require.NoError(t, err, "failed to create request to test handler")
req.Header.Set("Content-Encoding", "snappy")

client := http.Client{}

// test
res, err := client.Do(req)
require.NoError(t, err)

// verify
assert.Equal(t, http.StatusBadRequest, res.StatusCode, "test handler returned unexpected status code ")
_, err = io.ReadAll(res.Body)
require.NoError(t, res.Body.Close(), "failed to close request body: %v", err)
}

func compressGzip(t testing.TB, body []byte) *bytes.Buffer {
var buf bytes.Buffer
gw := gzip.NewWriter(&buf)
Expand Down
10 changes: 9 additions & 1 deletion config/confighttp/confighttp.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (

const headerContentEncoding = "Content-Encoding"
const defaultMaxRequestBodySize = 20 * 1024 * 1024 // 20MiB
var defaultCompressionAlgorithms = []string{"", "gzip", "zstd", "zlib", "snappy", "deflate"}

// ClientConfig defines settings for creating an HTTP client.
type ClientConfig struct {
Expand Down Expand Up @@ -280,6 +281,9 @@ type ServerConfig struct {
// Additional headers attached to each HTTP response sent to the client.
// Header values are opaque since they may be sensitive.
ResponseHeaders map[string]configopaque.String `mapstructure:"response_headers"`

// CompressionAlgorithms configures the list of compression algorithms the server can accept. Default: ["", "gzip", "zstd", "zlib", "snappy", "deflate"]
CompressionAlgorithms []string `mapstructure:"compression_algorithms"`
}

// ToListener creates a net.Listener.
Expand Down Expand Up @@ -345,7 +349,11 @@ func (hss *ServerConfig) ToServer(_ context.Context, host component.Host, settin
hss.MaxRequestBodySize = defaultMaxRequestBodySize
}

handler = httpContentDecompressor(handler, hss.MaxRequestBodySize, serverOpts.errHandler, serverOpts.decoders)
if hss.CompressionAlgorithms == nil {
hss.CompressionAlgorithms = defaultCompressionAlgorithms
}

handler = httpContentDecompressor(handler, hss.MaxRequestBodySize, serverOpts.errHandler, hss.CompressionAlgorithms, serverOpts.decoders)

if hss.MaxRequestBodySize > 0 {
handler = maxRequestBodySizeInterceptor(handler, hss.MaxRequestBodySize)
Expand Down
Loading