diff --git a/cache/async_cache.go b/cache/async_cache.go index e8223c1a..51013c12 100644 --- a/cache/async_cache.go +++ b/cache/async_cache.go @@ -20,6 +20,8 @@ type AsyncCache struct { TransactionRegistry graceTime time.Duration + + MaxPayloadSize config.ByteSize } func (c *AsyncCache) Close() error { @@ -103,9 +105,12 @@ func NewAsyncCache(cfg config.Cache, maxExecutionTime time.Duration) (*AsyncCach return nil, err } + maxPayloadSize := cfg.MaxPayloadSize + return &AsyncCache{ Cache: cache, TransactionRegistry: transaction, graceTime: graceTime, + MaxPayloadSize: maxPayloadSize, }, nil } diff --git a/cache/async_cache_test.go b/cache/async_cache_test.go index ed059cb1..276adee1 100644 --- a/cache/async_cache_test.go +++ b/cache/async_cache_test.go @@ -213,7 +213,8 @@ func TestAsyncCache_FilesystemCache_instantiation(t *testing.T) { Dir: asyncTestDir, MaxSize: 8192, }, - Expire: config.Duration(time.Minute), + Expire: config.Duration(time.Minute), + MaxPayloadSize: config.ByteSize(100000000), } if err := os.RemoveAll(testDirAsync); err != nil { log.Fatalf("cannot remove %q: %s", testDirAsync, err) @@ -248,7 +249,8 @@ func TestAsyncCache_RedisCache_instantiation(t *testing.T) { Redis: config.RedisCacheConfig{ Addresses: []string{s.Addr()}, }, - Expire: config.Duration(cacheTTL), + Expire: config.Duration(cacheTTL), + MaxPayloadSize: config.ByteSize(100000000), } _, err := NewAsyncCache(redisCfg, 1*time.Second) diff --git a/config/README.md b/config/README.md index 13bd84e5..4f3f3145 100644 --- a/config/README.md +++ b/config/README.md @@ -79,6 +79,10 @@ expire: # By default `grace_time` is 5s. Negative value disables the protection # from `thundering herd` problem. grace_time: + +# Maximum total size of request payload for caching. The default value +# is set to 1 Petabyte. +max_payload_size: ``` ### @@ -113,6 +117,11 @@ expire: # By default `grace_time` is 5s. Negative value disables the protection # from `thundering herd` problem. grace_time: + +# Maximum total size of request payload for caching. The default value +# is set to 1 Petabyte. +# The default value set so high is to allow users who do not use response size limitations virtually unlimited cache. +max_payload_size: ``` ### diff --git a/config/config.go b/config/config.go index 83592ee1..a56de18f 100644 --- a/config/config.go +++ b/config/config.go @@ -32,6 +32,8 @@ var ( } defaultExecutionTime = Duration(120 * time.Second) + + defaultMaxPayloadSize = ByteSize(1 << 50) ) // Config describes server configuration, access and proxy rules @@ -601,6 +603,9 @@ type Cache struct { // Catches all undefined fields XXX map[string]interface{} `yaml:",inline"` + + // Maximum total size of request payload for caching + MaxPayloadSize ByteSize `yaml:"max_payload_size,omitempty"` } type FileSystemCacheConfig struct { @@ -812,6 +817,13 @@ func LoadFile(filename string) (*Config, error) { } } + for i := range cfg.Caches { + c := &cfg.Caches[i] + if c.MaxPayloadSize <= 0 { + c.MaxPayloadSize = defaultMaxPayloadSize + } + } + if maxResponseTime < 0 { maxResponseTime = 0 } diff --git a/config/config_test.go b/config/config_test.go index 3b67eaab..d83a3b34 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -21,8 +21,9 @@ var fullConfig = Config{ Dir: "/path/to/longterm/cachedir", MaxSize: ByteSize(100 << 30), }, - Expire: Duration(time.Hour), - GraceTime: Duration(20 * time.Second), + Expire: Duration(time.Hour), + GraceTime: Duration(20 * time.Second), + MaxPayloadSize: ByteSize(100 << 30), }, { Name: "shortterm", @@ -31,7 +32,8 @@ var fullConfig = Config{ Dir: "/path/to/shortterm/cachedir", MaxSize: ByteSize(100 << 20), }, - Expire: Duration(10 * time.Second), + Expire: Duration(10 * time.Second), + MaxPayloadSize: ByteSize(100 << 20), }, }, HackMePlease: true, @@ -441,6 +443,11 @@ func TestBadConfig(t *testing.T) { "testdata/bad.heartbeat_section.empty.yml", "`cluster.heartbeat` cannot be unset for \"cluster\"", }, + { + "max payload size to cache", + "testdata/bad.max_payload_size.yml", + "cannot parse byte size \"-10B\": it must be positive float followed by optional units. For example, 1.5Gb, 3T", + }, } for _, tc := range testCases { @@ -815,12 +822,14 @@ caches: file_system: dir: /path/to/longterm/cachedir max_size: 107374182400 + max_payload_size: 107374182400 - mode: file_system name: shortterm expire: 10s file_system: dir: /path/to/shortterm/cachedir max_size: 104857600 + max_payload_size: 104857600 param_groups: - name: cron-job params: diff --git a/config/testdata/bad.max_payload_size.yml b/config/testdata/bad.max_payload_size.yml new file mode 100644 index 00000000..3607e06c --- /dev/null +++ b/config/testdata/bad.max_payload_size.yml @@ -0,0 +1,21 @@ +caches: + - name: "longterm" + mode: "file_system" + max_payload_size: "-10B" + file_system: + dir: "cache_dir" + max_size: 100Gb + +server: + http: + listen_addr: ":8080" + +users: + - name: "dummy" + allowed_networks: ["1.2.3.4"] + to_cluster: "cluster" + to_user: "default" + +clusters: + - name: "cluster" + nodes: ["127.0.1.1:8123"] diff --git a/config/testdata/full.yml b/config/testdata/full.yml index ca5fe90c..718a03d0 100644 --- a/config/testdata/full.yml +++ b/config/testdata/full.yml @@ -26,6 +26,8 @@ caches: # Path to directory where cached responses will be stored. dir: "/path/to/longterm/cachedir" + max_payload_size: 100Gb + # Expiration time for cached responses. expire: 1h @@ -44,6 +46,7 @@ caches: file_system: max_size: 100Mb dir: "/path/to/shortterm/cachedir" + max_payload_size: 100Mb expire: 10s # Optional network lists, might be used as values for `allowed_networks`. diff --git a/docs/content/en/configuration/caching.md b/docs/content/en/configuration/caching.md index 565e68a4..c5a6f64e 100644 --- a/docs/content/en/configuration/caching.md +++ b/docs/content/en/configuration/caching.md @@ -31,4 +31,7 @@ Distributed cache relies on external database to share cache across multiple rep multiple replicas deployments. Currently only [Redis](https://redis.io/) key value store is supported. Configuration template for distributed cache can be found [here](https://github.com/ContentSquare/chproxy/blob/master/config/#distributed_cache_config). - +#### Response limitations for caching +Before caching Clickhouse response, chproxy verifies that the response size +is not greater than configured max size. This setting can be specified in config section of the cache `max_payload_size`. The default value +is set to 1 Petabyte. Therefore, by default this security mechanism is disabled. \ No newline at end of file diff --git a/main_test.go b/main_test.go index 004ebcf5..5ee79da9 100644 --- a/main_test.go +++ b/main_test.go @@ -117,6 +117,90 @@ func TestServe(t *testing.T) { }, startTLS, }, + { + "https cache max payload size", + "testdata/https.cache.max-payload-size.yml", + func(t *testing.T) { + q := "SELECT MaxPayloadSize" + req, err := http.NewRequest("GET", "https://127.0.0.1:8443?query="+url.QueryEscape(q), nil) + checkErr(t, err) + req.SetBasicAuth("default", "qwerty") + req.Close = true + + resp, err := tlsClient.Do(req) + checkErr(t, err) + if resp.StatusCode != http.StatusOK { + t.Fatalf("unexpected status code: %d; expected: %d", resp.StatusCode, http.StatusOK) + } + + checkResponse(t, resp.Body, expectedOkResp) + + key := &cache.Key{ + Query: []byte(q), + AcceptEncoding: "gzip", + Version: cache.Version, + } + + cc := proxy.caches["https_cache_max_payload_size"] + cachedData, err := cc.Get(key) + + if cachedData != nil || err == nil { + t.Fatal("skipped response from cache is expected") + } + + resp.Body.Close() + }, + startTLS, + }, + { + "https cache max payload size not reached", + "testdata/https.cache.max-payload-size-not-reached.yml", + func(t *testing.T) { + q := "SELECT MaxPayloadSize" + req, err := http.NewRequest("GET", "https://127.0.0.1:8443?query="+url.QueryEscape(q), nil) + checkErr(t, err) + req.SetBasicAuth("default", "qwerty") + req.Close = true + + resp, err := tlsClient.Do(req) + checkErr(t, err) + if resp.StatusCode != http.StatusOK { + t.Fatalf("unexpected status code: %d; expected: %d", resp.StatusCode, http.StatusOK) + } + + checkResponse(t, resp.Body, expectedOkResp) + + key := &cache.Key{ + Query: []byte(q), + AcceptEncoding: "gzip", + Version: cache.Version, + } + + path := fmt.Sprintf("%s/cache/%s", testDir, key.String()) + if _, err := os.Stat(path); err != nil { + t.Fatalf("err while getting file %q info: %s", path, err) + } + + rw := httptest.NewRecorder() + + cc := proxy.caches["https_cache_max_payload_size"] + cachedData, err := cc.Get(key) + + if err != nil { + t.Fatalf("unexpected error while getting response from cache: %s", err) + } + + err = RespondWithData(rw, cachedData.Data, cachedData.ContentMetadata, cachedData.Ttl, 200) + if err != nil { + t.Fatalf("unexpected error while getting response from cache: %s", err) + } + checkResponse(t, rw.Body, expectedOkResp) + + cachedData.Data.Close() + resp.Body.Close() + }, + startTLS, + }, { "https cache with mix query source", "testdata/https.cache.yml", @@ -339,6 +423,82 @@ func TestServe(t *testing.T) { }, startHTTP, }, + { + "http cache max payload size", + "testdata/http.cache.max-payload-size.yml", + func(t *testing.T) { + q := "SELECT MaxPayloadSize" + req, err := http.NewRequest("GET", "http://127.0.0.1:9090?query="+url.QueryEscape(q), nil) + checkErr(t, err) + req.SetBasicAuth("default", "qwerty") + req.Close = true + + resp, err := httpRequest(t, req, http.StatusOK) + checkResponse(t, resp.Body, expectedOkResp) + if resp.StatusCode != http.StatusOK { + t.Fatalf("unexpected status code: %d; expected: %d", resp.StatusCode, http.StatusOK) + } + + key := &cache.Key{ + Query: []byte(q), + AcceptEncoding: "gzip", + Version: cache.Version, + } + + cc := proxy.caches["http_cache_max_payload_size"] + cachedData, err := cc.Get(key) + + if cachedData != nil || err == nil { + t.Fatal("skipped response from cache is expected") + } + + resp.Body.Close() + }, + startHTTP, + }, + { + "http cache max payload size not reached", + "testdata/http.cache.max-payload-size-not-reached.yml", + func(t *testing.T) { + q := "SELECT MaxPayloadSize" + req, err := http.NewRequest("GET", "http://127.0.0.1:9090?query="+url.QueryEscape(q), nil) + checkErr(t, err) + req.SetBasicAuth("default", "qwerty") + req.Close = true + + resp, err := httpRequest(t, req, http.StatusOK) + checkResponse(t, resp.Body, expectedOkResp) + if resp.StatusCode != http.StatusOK { + t.Fatalf("unexpected status code: %d; expected: %d", resp.StatusCode, http.StatusOK) + } + + key := &cache.Key{ + Query: []byte(q), + AcceptEncoding: "gzip", + Version: cache.Version, + } + + cc := proxy.caches["http_cache_max_payload_size"] + cachedData, err := cc.Get(key) + + if err != nil { + t.Fatalf("unexpected error while getting response from cache: %s", err) + } + + rw := httptest.NewRecorder() + + err = RespondWithData(rw, cachedData.Data, cachedData.ContentMetadata, cachedData.Ttl, 200) + if err != nil { + t.Fatalf("unexpected error while getting response from cache: %s", err) + } + checkResponse(t, rw.Body, expectedOkResp) + + cachedData.Data.Close() + + resp.Body.Close() + }, + startHTTP, + }, { "http requests with caching in redis ", "testdata/http.cache.redis.yml", @@ -794,6 +954,14 @@ func fakeCHHandler(w http.ResponseWriter, r *http.Request) { case q == "SELECT 1 FORMAT TabSeparatedWithNamesAndTypes": w.WriteHeader(http.StatusOK) w.Write(bytesWithInvalidUTFPairs) + case q == "SELECT MaxPayloadSize": + w.WriteHeader(http.StatusOK) + + // generate 10M payload size + b := make([]byte, 10485760) + w.Header().Set("Content-Type", "text/plain; charset=utf-8") + fmt.Fprint(w, b) + fmt.Fprint(w, "Ok.\n") default: if strings.Contains(string(query), killQueryPattern) { fakeCHState.kill() diff --git a/metrics.go b/metrics.go index 5681a998..0da68ee9 100644 --- a/metrics.go +++ b/metrics.go @@ -115,6 +115,13 @@ var ( }, []string{"cache"}, ) + cacheSkipped = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "cache_payloadsize_too_big_total", + Help: "The amount of too big payloads to be cached", + }, + []string{"cache", "user", "cluster", "cluster_user"}, + ) requestDuration = prometheus.NewSummaryVec( prometheus.SummaryOpts{ Name: "request_duration_seconds", @@ -194,7 +201,7 @@ func registerMetrics() { limitExcess, hostPenalties, hostHealth, concurrentQueries, requestQueueSize, userQueueOverflow, clusterUserQueueOverflow, requestBodyBytes, responseBodyBytes, - cacheHit, cacheMiss, cacheSize, cacheItems, + cacheHit, cacheMiss, cacheSize, cacheItems, cacheSkipped, requestDuration, proxiedResponseDuration, cachedResponseDuration, canceledRequest, timeoutRequest, configSuccess, configSuccessTime, badRequest) diff --git a/proxy.go b/proxy.go index 8944fec1..887852be 100644 --- a/proxy.go +++ b/proxy.go @@ -351,9 +351,20 @@ func (rp *reverseProxy) serveFromCache(s *scope, srw *statResponseWriter, req *h if err != nil { err = fmt.Errorf("%s: %w; query: %q", s, err, q) respondWith(srw, err, http.StatusInternalServerError) - return } } else { + // Do not cache responses greater than max payload size. + if contentLength > int64(s.user.cache.MaxPayloadSize) { + cacheSkipped.With(labels).Inc() + log.Debugf("%s: Request will not be cached. Content length (%d) is greater than max payload size (%d)", s, contentLength, s.user.cache.MaxPayloadSize) + + err = RespondWithData(srw, reader, contentMetadata, 0*time.Second, tmpFileRespWriter.StatusCode()) + if err != nil { + err = fmt.Errorf("%s: %w; query: %q", s, err, q) + respondWith(srw, err, http.StatusInternalServerError) + } + return + } cacheMiss.With(labels).Inc() log.Debugf("%s: cache miss", s) expiration, err := userCache.Put(reader, contentMetadata, key) diff --git a/testdata/http.cache.max-payload-size-not-reached.yml b/testdata/http.cache.max-payload-size-not-reached.yml new file mode 100644 index 00000000..94c14f3d --- /dev/null +++ b/testdata/http.cache.max-payload-size-not-reached.yml @@ -0,0 +1,26 @@ +log_debug: true + +caches: + - name: "http_cache_max_payload_size" + mode: "file_system" + max_payload_size: "50M" + file_system: + dir: "temp-test-data/cache" + max_size: "100M" + expire: "1m" + +server: + http: + listen_addr: ":9090" + allowed_networks: [ "127.0.0.1/24" ] + +users: + - name: "default" + password: "qwerty" + to_cluster: "default" + to_user: "default" + cache: "http_cache_max_payload_size" + +clusters: + - name: "default" + nodes: ["127.0.0.1:8124"] diff --git a/testdata/http.cache.max-payload-size.yml b/testdata/http.cache.max-payload-size.yml new file mode 100644 index 00000000..5143d367 --- /dev/null +++ b/testdata/http.cache.max-payload-size.yml @@ -0,0 +1,26 @@ +log_debug: true + +caches: + - name: "http_cache_max_payload_size" + mode: "file_system" + max_payload_size: "8M" + file_system: + dir: "temp-test-data/cache" + max_size: "100M" + expire: "1m" + +server: + http: + listen_addr: ":9090" + allowed_networks: [ "127.0.0.1/24" ] + +users: + - name: "default" + password: "qwerty" + to_cluster: "default" + to_user: "default" + cache: "http_cache_max_payload_size" + +clusters: + - name: "default" + nodes: ["127.0.0.1:8124"] diff --git a/testdata/https.cache.max-payload-size-not-reached.yml b/testdata/https.cache.max-payload-size-not-reached.yml new file mode 100644 index 00000000..a1c77630 --- /dev/null +++ b/testdata/https.cache.max-payload-size-not-reached.yml @@ -0,0 +1,27 @@ +log_debug: true + +caches: + - name: "https_cache_max_payload_size" + mode: "file_system" + max_payload_size: "50M" + file_system: + dir: "temp-test-data/cache" + max_size: "100M" + expire: "1m" + +server: + https: + listen_addr: ":8443" + cert_file: "testdata/example.com.cert" + key_file: "testdata/example.com.key" + +users: + - name: "default" + password: "qwerty" + to_cluster: "default" + to_user: "default" + cache: "https_cache_max_payload_size" + +clusters: + - name: "default" + nodes: ["127.0.0.1:8124"] diff --git a/testdata/https.cache.max-payload-size.yml b/testdata/https.cache.max-payload-size.yml new file mode 100644 index 00000000..ec0742e2 --- /dev/null +++ b/testdata/https.cache.max-payload-size.yml @@ -0,0 +1,27 @@ +log_debug: true + +caches: + - name: "https_cache_max_payload_size" + mode: "file_system" + max_payload_size: "8M" + file_system: + dir: "temp-test-data/cache" + max_size: "100M" + expire: "1m" + +server: + https: + listen_addr: ":8443" + cert_file: "testdata/example.com.cert" + key_file: "testdata/example.com.key" + +users: + - name: "default" + password: "qwerty" + to_cluster: "default" + to_user: "default" + cache: "https_cache_max_payload_size" + +clusters: + - name: "default" + nodes: ["127.0.0.1:8124"]