From 605beb88a2899f78394e77ec9042c78867cc693d Mon Sep 17 00:00:00 2001 From: "Marcelo E. Magallon" Date: Wed, 10 Mar 2021 08:05:14 -0600 Subject: [PATCH] Add support for decompression of HTTP responses If the module configuration specifies the "compression" option blackbox_exporter will try to decompress the response using the specified algorithm. If the response is not compressed using that algorithm, the probe will fail. It validates that the "Accept-Encoding" header is either absent, or that it specifies the same algorithm as the "compression" option. If the "Accept-Encoding" header is present but it specifies a different algorithm, the probe will fail. If the compression option is *not* used, probe_http_content_length and probe_http_uncompressed_body_length will have the same value corresponding to the original content length. If the compression option is used and the content can be decompressed, probe_http_content_length will report the original content length as it currently does, and probe_http_uncompressed_body_length will report the length of the body after decompression as expected. Fixes #684 Signed-off-by: Marcelo E. Magallon --- CONFIGURATION.md | 3 + config/config.go | 1 + example.yml | 12 ++ go.mod | 1 + go.sum | 2 + prober/http.go | 67 ++++++++++- prober/http_test.go | 270 ++++++++++++++++++++++++++++++++++++++++++++ 7 files changed, 353 insertions(+), 3 deletions(-) diff --git a/CONFIGURATION.md b/CONFIGURATION.md index dbb852a3d..1e6b54bd6 100644 --- a/CONFIGURATION.md +++ b/CONFIGURATION.md @@ -49,6 +49,9 @@ The other placeholders are specified separately. headers: [ : ... ] + # The compression algorithm to use to decompress the response. + [ compression: | default = "" ] + # Whether or not the probe will follow any redirects. [ no_follow_redirects: | default = false ] diff --git a/config/config.go b/config/config.go index ee460106b..9f8a3e0d0 100644 --- a/config/config.go +++ b/config/config.go @@ -188,6 +188,7 @@ type HTTPProbe struct { FailIfHeaderNotMatchesRegexp []HeaderMatch `yaml:"fail_if_header_not_matches,omitempty"` Body string `yaml:"body,omitempty"` HTTPClientConfig config.HTTPClientConfig `yaml:"http_client_config,inline"` + Compression string `yaml:"compression,omitempty"` } type HeaderMatch struct { diff --git a/example.yml b/example.yml index cdba7a14c..458c290f4 100644 --- a/example.yml +++ b/example.yml @@ -52,6 +52,18 @@ modules: method: GET tls_config: ca_file: "/certs/my_cert.crt" + http_gzip: + prober: http + http: + method: GET + compression: gzip + http_gzip_with_accept_encoding: + prober: http + http: + method: GET + compression: gzip + headers: + Accept-Encoding: gzip tls_connect: prober: tcp timeout: 5s diff --git a/go.mod b/go.mod index 3f135c298..0be4c24d5 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,7 @@ module github.com/prometheus/blackbox_exporter require ( + github.com/andybalholm/brotli v1.0.1 github.com/go-kit/kit v0.10.0 github.com/miekg/dns v1.1.40 github.com/pkg/errors v0.9.1 diff --git a/go.sum b/go.sum index 6e2ab0321..8f726443a 100644 --- a/go.sum +++ b/go.sum @@ -13,6 +13,8 @@ github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRF github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d h1:UQZhZ2O0vMHr2cI+DC1Mbh0TJxzA3RcLoMsFw+aXw7E= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= +github.com/andybalholm/brotli v1.0.1 h1:KqhlKozYbRtJvsPrrEeXcO+N2l6NYT5A2QAFmSULpEc= +github.com/andybalholm/brotli v1.0.1/go.mod h1:loMXtMfwqflxFJPmdbJO0a3KNoPuLBgiu3qAvBg8x/Y= github.com/apache/thrift v0.12.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ= github.com/apache/thrift v0.13.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ= github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o= diff --git a/prober/http.go b/prober/http.go index 500c92b56..31f22c6d0 100644 --- a/prober/http.go +++ b/prober/http.go @@ -14,6 +14,8 @@ package prober import ( + "compress/flate" + "compress/gzip" "context" "errors" "fmt" @@ -30,6 +32,7 @@ import ( "sync" "time" + "github.com/andybalholm/brotli" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/prometheus/client_golang/prometheus" @@ -397,10 +400,23 @@ func ProbeHTTP(ctx context.Context, target string, module config.Module, registr request = request.WithContext(ctx) for key, value := range httpConfig.Headers { - if strings.Title(key) == "Host" { + normalizedKey := strings.Title(key) + + if normalizedKey == "Host" { request.Host = value continue } + + // If there's a compression setting, and there's also an + // accept-encoding header, they MUST match, otherwise we + // end up requesting one encoding and trying to process + // a different one, which is more likely than not going + // to fail. + if httpConfig.Compression != "" && normalizedKey == "Accept-Encoding" && !strings.EqualFold(value, httpConfig.Compression) { + level.Error(logger).Log("msg", "Invalid configuration", key, value, "compression", httpConfig.Compression) + return + } + request.Header.Set(key, value) } @@ -455,6 +471,31 @@ func ProbeHTTP(ctx context.Context, target string, module config.Module, registr } } + // Since the configuration specifies a compression algorithm, blindly treat the response body as a + // compressed payload; if we cannot decompress it it's a failure because the configuration says we + // should expect the response to be compressed in that way. + if httpConfig.Compression != "" { + dec, err := getDecompressionReader(httpConfig.Compression, resp.Body) + if err != nil { + level.Info(logger).Log("msg", "Failed to get decompressor for HTTP response body", "err", err.Error()) + success = false + } else if dec != nil { + // Since we are replacing the original resp.Body with the decoder, we need to make sure + // we close the original body. We cannot close it right away because the decompressor + // might not have read it yet. + defer func(c io.Closer) { + err := c.Close() + if err != nil { + // At this point we cannot really do anything with this error, but log + // it in case it contains useful information as to what's the problem. + level.Info(logger).Log("msg", "Error while closing response from server", "error", err.Error()) + } + }(resp.Body) + + resp.Body = dec + } + } + byteCounter := &byteCounter{ReadCloser: resp.Body} if success && (len(httpConfig.FailIfBodyMatchesRegexp) > 0 || len(httpConfig.FailIfBodyNotMatchesRegexp) > 0) { @@ -476,8 +517,9 @@ func ProbeHTTP(ctx context.Context, target string, module config.Module, registr respBodyBytes = byteCounter.n if err := byteCounter.Close(); err != nil { - // We have already read everything we could from the server. The error here might be a - // TCP error. Log it in case it contains useful information as to what's the problem. + // We have already read everything we could from the server, maybe even uncompressed the + // body. The error here might be either a decompression error or a TCP error. Log it in + // case it contains useful information as to what's the problem. level.Info(logger).Log("msg", "Error while closing response from server", "error", err.Error()) } } @@ -578,3 +620,22 @@ func ProbeHTTP(ctx context.Context, target string, module config.Module, registr redirectsGauge.Set(float64(redirects)) return } + +func getDecompressionReader(algorithm string, origBody io.ReadCloser) (io.ReadCloser, error) { + switch strings.ToLower(algorithm) { + case "br": + return ioutil.NopCloser(brotli.NewReader(origBody)), nil + + case "deflate": + return flate.NewReader(origBody), nil + + case "gzip": + return gzip.NewReader(origBody) + + case "identity", "": + return origBody, nil + + default: + return nil, errors.New("unsupported compression algorithm") + } +} diff --git a/prober/http_test.go b/prober/http_test.go index ec7106ae6..0b2d854f5 100644 --- a/prober/http_test.go +++ b/prober/http_test.go @@ -15,6 +15,8 @@ package prober import ( "bytes" + "compress/flate" + "compress/gzip" "context" "crypto/tls" "crypto/x509" @@ -29,6 +31,7 @@ import ( "testing" "time" + "github.com/andybalholm/brotli" "github.com/go-kit/kit/log" "github.com/prometheus/client_golang/prometheus" pconfig "github.com/prometheus/common/config" @@ -101,6 +104,7 @@ func TestValidHTTPVersion(t *testing.T) { func TestContentLength(t *testing.T) { type testdata struct { + msg []byte contentLength int uncompressedBodyLength int handler http.HandlerFunc @@ -113,6 +117,7 @@ func TestContentLength(t *testing.T) { testcases := map[string]testdata{ "identity": { + msg: testmsg, contentLength: len(testmsg), uncompressedBodyLength: len(testmsg), handler: func(w http.ResponseWriter, r *http.Request) { @@ -123,6 +128,7 @@ func TestContentLength(t *testing.T) { }, "no content-encoding": { + msg: testmsg, contentLength: len(testmsg), uncompressedBodyLength: len(testmsg), handler: func(w http.ResponseWriter, r *http.Request) { @@ -133,6 +139,7 @@ func TestContentLength(t *testing.T) { // Unknown Content-Encoding, we should let this pass thru. "unknown content-encoding": { + msg: testmsg, contentLength: len(testmsg), uncompressedBodyLength: len(testmsg), handler: func(w http.ResponseWriter, r *http.Request) { @@ -145,6 +152,7 @@ func TestContentLength(t *testing.T) { // 401 response, verify that the content-length is still computed correctly. "401": { expectFailure: true, + msg: notfoundMsg, contentLength: len(notfoundMsg), uncompressedBodyLength: len(notfoundMsg), handler: func(w http.ResponseWriter, r *http.Request) { @@ -153,6 +161,64 @@ func TestContentLength(t *testing.T) { w.Write(notfoundMsg) }, }, + + // Compressed payload _without_ compression setting, it should not be decompressed. + "brotli": func() testdata { + msg := testmsg + var buf bytes.Buffer + fw := brotli.NewWriter(&buf) + fw.Write([]byte(msg)) + fw.Close() + return testdata{ + msg: msg, + contentLength: len(buf.Bytes()), // Content lenght is the length of the compressed buffer. + uncompressedBodyLength: len(buf.Bytes()), // No decompression. + handler: func(w http.ResponseWriter, r *http.Request) { + w.Header().Add("Content-Encoding", "br") + w.WriteHeader(http.StatusOK) + w.Write(buf.Bytes()) + }, + } + }(), + + // Compressed payload _without_ compression setting, it should not be decompressed. + "deflate": func() testdata { + msg := testmsg + var buf bytes.Buffer + // the only error path is an invalid compression level + fw, _ := flate.NewWriter(&buf, flate.DefaultCompression) + fw.Write([]byte(msg)) + fw.Close() + return testdata{ + msg: msg, + contentLength: len(buf.Bytes()), // Content lenght is the length of the compressed buffer. + uncompressedBodyLength: len(buf.Bytes()), // No decompression. + handler: func(w http.ResponseWriter, r *http.Request) { + w.Header().Add("Content-Encoding", "deflate") + w.WriteHeader(http.StatusOK) + w.Write(buf.Bytes()) + }, + } + }(), + + // Compressed payload _without_ compression setting, it should not be decompressed. + "gzip": func() testdata { + msg := testmsg + var buf bytes.Buffer + gw := gzip.NewWriter(&buf) + gw.Write([]byte(msg)) + gw.Close() + return testdata{ + msg: msg, + contentLength: len(buf.Bytes()), // Content lenght is the length of the compressed buffer. + uncompressedBodyLength: len(buf.Bytes()), // No decompression. + handler: func(w http.ResponseWriter, r *http.Request) { + w.Header().Add("Content-Encoding", "gzip") + w.WriteHeader(http.StatusOK) + w.Write(buf.Bytes()) + }, + } + }(), } for name, tc := range testcases { @@ -193,6 +259,210 @@ func TestContentLength(t *testing.T) { } } +// TestHandlingOfCompressionSetting verifies that the "compression" +// setting is handled correctly: content is decompressed only if +// compression is specified, and only the specified compression +// algorithm is handled. +func TestHandlingOfCompressionSetting(t *testing.T) { + type testdata struct { + contentLength int + uncompressedBodyLength int + handler http.HandlerFunc + expectFailure bool + httpConfig config.HTTPProbe + } + + testmsg := []byte(strings.Repeat("hello world", 10)) + + testcases := map[string]testdata{ + "gzip": func() testdata { + msg := testmsg + var buf bytes.Buffer + enc := gzip.NewWriter(&buf) + enc.Write(msg) + enc.Close() + return testdata{ + contentLength: buf.Len(), // Content lenght is the length of the compressed buffer. + uncompressedBodyLength: len(msg), + handler: func(w http.ResponseWriter, r *http.Request) { + w.Header().Add("Content-Encoding", "gzip") + w.WriteHeader(http.StatusOK) + w.Write(buf.Bytes()) + }, + httpConfig: config.HTTPProbe{ + IPProtocolFallback: true, + Compression: "gzip", + }, + } + }(), + + "brotli": func() testdata { + msg := testmsg + var buf bytes.Buffer + enc := brotli.NewWriter(&buf) + enc.Write(msg) + enc.Close() + return testdata{ + contentLength: len(buf.Bytes()), // Content lenght is the length of the compressed buffer. + uncompressedBodyLength: len(msg), + handler: func(w http.ResponseWriter, r *http.Request) { + w.Header().Add("Content-Encoding", "br") + w.WriteHeader(http.StatusOK) + w.Write(buf.Bytes()) + }, + httpConfig: config.HTTPProbe{ + IPProtocolFallback: true, + Compression: "br", + }, + } + }(), + + "deflate": func() testdata { + msg := testmsg + var buf bytes.Buffer + // the only error path is an invalid compression level + enc, _ := flate.NewWriter(&buf, flate.DefaultCompression) + enc.Write(msg) + enc.Close() + return testdata{ + contentLength: len(buf.Bytes()), // Content lenght is the length of the compressed buffer. + uncompressedBodyLength: len(msg), + handler: func(w http.ResponseWriter, r *http.Request) { + w.Header().Add("Content-Encoding", "deflate") + w.WriteHeader(http.StatusOK) + w.Write(buf.Bytes()) + }, + httpConfig: config.HTTPProbe{ + IPProtocolFallback: true, + Compression: "deflate", + }, + } + }(), + + "identity": { + contentLength: len(testmsg), + uncompressedBodyLength: len(testmsg), + handler: func(w http.ResponseWriter, r *http.Request) { + w.Header().Add("Content-Encoding", "identity") + w.WriteHeader(http.StatusOK) + w.Write(testmsg) + }, + httpConfig: config.HTTPProbe{ + IPProtocolFallback: true, + Compression: "identity", + }, + }, + + // We do exactly as told: the server is returning a + // gzip-encoded response, but the module is expecting a + // delfate-encoded response. This should fail. + "compression encoding mismatch": func() testdata { + msg := testmsg + var buf bytes.Buffer + enc := gzip.NewWriter(&buf) + enc.Write(msg) + enc.Close() + return testdata{ + expectFailure: true, + contentLength: buf.Len(), // Content lenght is the length of the compressed buffer. + uncompressedBodyLength: 0, + handler: func(w http.ResponseWriter, r *http.Request) { + w.Header().Add("Content-Encoding", "gzip") + w.WriteHeader(http.StatusOK) + w.Write(buf.Bytes()) + }, + httpConfig: config.HTTPProbe{ + IPProtocolFallback: true, + Compression: "deflate", + }, + } + }(), + + "header mismatch": func() testdata { + msg := testmsg + var buf bytes.Buffer + enc := gzip.NewWriter(&buf) + enc.Write(msg) + enc.Close() + return testdata{ + expectFailure: true, + contentLength: 0, // Content won't be fetched because the configuration is invalid. + uncompressedBodyLength: 0, + handler: func(w http.ResponseWriter, r *http.Request) { + w.Header().Add("Content-Encoding", "gzip") + w.WriteHeader(http.StatusOK) + w.Write(buf.Bytes()) + }, + httpConfig: config.HTTPProbe{ + IPProtocolFallback: true, + Compression: "gzip", + Headers: map[string]string{ + "Accept-Encoding": "deflate", + }, + }, + } + }(), + + "compressed content without compression setting": func() testdata { + msg := testmsg + var buf bytes.Buffer + enc := gzip.NewWriter(&buf) + enc.Write(msg) + enc.Close() + return testdata{ + expectFailure: false, + contentLength: buf.Len(), + uncompressedBodyLength: buf.Len(), // content won't be uncompressed + handler: func(w http.ResponseWriter, r *http.Request) { + w.Header().Add("Content-Encoding", "gzip") + w.WriteHeader(http.StatusOK) + w.Write(buf.Bytes()) + }, + httpConfig: config.HTTPProbe{ + IPProtocolFallback: true, + }, + } + }(), + } + + for name, tc := range testcases { + t.Run(name, func(t *testing.T) { + ts := httptest.NewServer(tc.handler) + defer ts.Close() + + testCTX, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + registry := prometheus.NewRegistry() + var logbuf bytes.Buffer + result := ProbeHTTP(testCTX, + ts.URL, + config.Module{ + Timeout: time.Second, + HTTP: tc.httpConfig, + }, + registry, + log.NewLogfmtLogger(&logbuf)) + if !tc.expectFailure && !result { + t.Fatalf("probe failed unexpectedly: %s", logbuf.String()) + } else if tc.expectFailure && result { + t.Fatalf("probe succeeded unexpectedly: %s", logbuf.String()) + } + + mfs, err := registry.Gather() + if err != nil { + t.Fatal(err) + } + + expectedResults := map[string]float64{ + "probe_http_content_length": float64(tc.contentLength), + "probe_http_uncompressed_body_length": float64(tc.uncompressedBodyLength), + } + checkRegistryResults(expectedResults, mfs, t) + }) + } +} + func TestRedirectFollowed(t *testing.T) { ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if r.URL.Path == "/" {