Skip to content

Commit

Permalink
chore: Handle too big CH payloads for caching (#191)
Browse files Browse the repository at this point in the history
* max payload size for caching

* add tests max payload size to proxy_test.go
  • Loading branch information
sigua-cs authored Sep 9, 2022
1 parent b459dcc commit d257a95
Show file tree
Hide file tree
Showing 16 changed files with 314 additions and 8 deletions.
5 changes: 5 additions & 0 deletions cache/async_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ type AsyncCache struct {
TransactionRegistry

graceTime time.Duration

MaxPayloadSize config.ByteSize
}

func (c *AsyncCache) Close() error {
Expand Down Expand Up @@ -103,9 +105,12 @@ func NewAsyncCache(cfg config.Cache, maxExecutionTime time.Duration) (*AsyncCach
return nil, err
}

maxPayloadSize := cfg.MaxPayloadSize

return &AsyncCache{
Cache: cache,
TransactionRegistry: transaction,
graceTime: graceTime,
MaxPayloadSize: maxPayloadSize,
}, nil
}
6 changes: 4 additions & 2 deletions cache/async_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,8 @@ func TestAsyncCache_FilesystemCache_instantiation(t *testing.T) {
Dir: asyncTestDir,
MaxSize: 8192,
},
Expire: config.Duration(time.Minute),
Expire: config.Duration(time.Minute),
MaxPayloadSize: config.ByteSize(100000000),
}
if err := os.RemoveAll(testDirAsync); err != nil {
log.Fatalf("cannot remove %q: %s", testDirAsync, err)
Expand Down Expand Up @@ -248,7 +249,8 @@ func TestAsyncCache_RedisCache_instantiation(t *testing.T) {
Redis: config.RedisCacheConfig{
Addresses: []string{s.Addr()},
},
Expire: config.Duration(cacheTTL),
Expire: config.Duration(cacheTTL),
MaxPayloadSize: config.ByteSize(100000000),
}

_, err := NewAsyncCache(redisCfg, 1*time.Second)
Expand Down
9 changes: 9 additions & 0 deletions config/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ expire: <duration>
# By default `grace_time` is 5s. Negative value disables the protection
# from `thundering herd` problem.
grace_time: <duration>

# Maximum total size of request payload for caching. The default value
# is set to 1 Petabyte.
max_payload_size: <byte_size>
```
### <distributed_cache_config>
Expand Down Expand Up @@ -113,6 +117,11 @@ expire: <duration>
# By default `grace_time` is 5s. Negative value disables the protection
# from `thundering herd` problem.
grace_time: <duration>

# Maximum total size of request payload for caching. The default value
# is set to 1 Petabyte.
# The default value set so high is to allow users who do not use response size limitations virtually unlimited cache.
max_payload_size: <byte_size>
```
### <param_groups_config>
Expand Down
12 changes: 12 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ var (
}

defaultExecutionTime = Duration(120 * time.Second)

defaultMaxPayloadSize = ByteSize(1 << 50)
)

// Config describes server configuration, access and proxy rules
Expand Down Expand Up @@ -601,6 +603,9 @@ type Cache struct {

// Catches all undefined fields
XXX map[string]interface{} `yaml:",inline"`

// Maximum total size of request payload for caching
MaxPayloadSize ByteSize `yaml:"max_payload_size,omitempty"`
}

type FileSystemCacheConfig struct {
Expand Down Expand Up @@ -812,6 +817,13 @@ func LoadFile(filename string) (*Config, error) {
}
}

for i := range cfg.Caches {
c := &cfg.Caches[i]
if c.MaxPayloadSize <= 0 {
c.MaxPayloadSize = defaultMaxPayloadSize
}
}

if maxResponseTime < 0 {
maxResponseTime = 0
}
Expand Down
15 changes: 12 additions & 3 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ var fullConfig = Config{
Dir: "/path/to/longterm/cachedir",
MaxSize: ByteSize(100 << 30),
},
Expire: Duration(time.Hour),
GraceTime: Duration(20 * time.Second),
Expire: Duration(time.Hour),
GraceTime: Duration(20 * time.Second),
MaxPayloadSize: ByteSize(100 << 30),
},
{
Name: "shortterm",
Expand All @@ -31,7 +32,8 @@ var fullConfig = Config{
Dir: "/path/to/shortterm/cachedir",
MaxSize: ByteSize(100 << 20),
},
Expire: Duration(10 * time.Second),
Expire: Duration(10 * time.Second),
MaxPayloadSize: ByteSize(100 << 20),
},
},
HackMePlease: true,
Expand Down Expand Up @@ -441,6 +443,11 @@ func TestBadConfig(t *testing.T) {
"testdata/bad.heartbeat_section.empty.yml",
"`cluster.heartbeat` cannot be unset for \"cluster\"",
},
{
"max payload size to cache",
"testdata/bad.max_payload_size.yml",
"cannot parse byte size \"-10B\": it must be positive float followed by optional units. For example, 1.5Gb, 3T",
},
}

for _, tc := range testCases {
Expand Down Expand Up @@ -815,12 +822,14 @@ caches:
file_system:
dir: /path/to/longterm/cachedir
max_size: 107374182400
max_payload_size: 107374182400
- mode: file_system
name: shortterm
expire: 10s
file_system:
dir: /path/to/shortterm/cachedir
max_size: 104857600
max_payload_size: 104857600
param_groups:
- name: cron-job
params:
Expand Down
21 changes: 21 additions & 0 deletions config/testdata/bad.max_payload_size.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
caches:
- name: "longterm"
mode: "file_system"
max_payload_size: "-10B"
file_system:
dir: "cache_dir"
max_size: 100Gb

server:
http:
listen_addr: ":8080"

users:
- name: "dummy"
allowed_networks: ["1.2.3.4"]
to_cluster: "cluster"
to_user: "default"

clusters:
- name: "cluster"
nodes: ["127.0.1.1:8123"]
3 changes: 3 additions & 0 deletions config/testdata/full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ caches:
# Path to directory where cached responses will be stored.
dir: "/path/to/longterm/cachedir"

max_payload_size: 100Gb

# Expiration time for cached responses.
expire: 1h

Expand All @@ -44,6 +46,7 @@ caches:
file_system:
max_size: 100Mb
dir: "/path/to/shortterm/cachedir"
max_payload_size: 100Mb
expire: 10s

# Optional network lists, might be used as values for `allowed_networks`.
Expand Down
5 changes: 4 additions & 1 deletion docs/content/en/configuration/caching.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,7 @@ Distributed cache relies on external database to share cache across multiple rep
multiple replicas deployments. Currently only [Redis](https://redis.io/) key value store is supported.
Configuration template for distributed cache can be found [here](https://github.com/ContentSquare/chproxy/blob/master/config/#distributed_cache_config).


#### Response limitations for caching
Before caching Clickhouse response, chproxy verifies that the response size
is not greater than configured max size. This setting can be specified in config section of the cache `max_payload_size`. The default value
is set to 1 Petabyte. Therefore, by default this security mechanism is disabled.
87 changes: 87 additions & 0 deletions main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,85 @@ func TestServe(t *testing.T) {
},
startTLS,
},
{
"https cache max payload size",
"testdata/https.cache.max-payload-size.yml",
func(t *testing.T) {
q := "SELECT MaxPayloadSize"
req, err := http.NewRequest("GET", "https://127.0.0.1:8443?query="+url.QueryEscape(q), nil)
checkErr(t, err)
req.SetBasicAuth("default", "qwerty")
req.Close = true

resp, err := tlsClient.Do(req)
checkErr(t, err)
if resp.StatusCode != http.StatusOK {
t.Fatalf("unexpected status code: %d; expected: %d", resp.StatusCode, http.StatusOK)
}

checkResponse(t, resp.Body, expectedOkResp)

key := &cache.Key{
Query: []byte(q),
AcceptEncoding: "gzip",
Version: cache.Version,
}

cc := proxy.caches["https_cache_max_payload_size"]
cachedData, err := cc.Get(key)

if cachedData != nil || err == nil {
t.Fatal("response bigger than maxPayloadSize should not be cached")
}

resp.Body.Close()
},
startTLS,
},
{
"https cache max payload size not reached",
"testdata/https.cache.max-payload-size-not-reached.yml",
func(t *testing.T) {
q := "SELECT MaxPayloadSize"
req, err := http.NewRequest("GET", "https://127.0.0.1:8443?query="+url.QueryEscape(q), nil)
checkErr(t, err)
req.SetBasicAuth("default", "qwerty")
req.Close = true

resp, err := tlsClient.Do(req)
checkErr(t, err)
if resp.StatusCode != http.StatusOK {
t.Fatalf("unexpected status code: %d; expected: %d", resp.StatusCode, http.StatusOK)
}

checkResponse(t, resp.Body, expectedOkResp)

key := &cache.Key{
Query: []byte(q),
AcceptEncoding: "gzip",
Version: cache.Version,
}

rw := httptest.NewRecorder()

cc := proxy.caches["https_cache_max_payload_size"]
cachedData, err := cc.Get(key)

if err != nil {
t.Fatalf("unexpected error while getting response from cache: %s", err)
}

err = RespondWithData(rw, cachedData.Data, cachedData.ContentMetadata, cachedData.Ttl, 200)
if err != nil {
t.Fatalf("unexpected error while getting response from cache: %s", err)
}
checkResponse(t, rw.Body, expectedOkResp)

cachedData.Data.Close()
resp.Body.Close()
},
startTLS,
},
{
"https cache with mix query source",
"testdata/https.cache.yml",
Expand Down Expand Up @@ -794,6 +873,14 @@ func fakeCHHandler(w http.ResponseWriter, r *http.Request) {
case q == "SELECT 1 FORMAT TabSeparatedWithNamesAndTypes":
w.WriteHeader(http.StatusOK)
w.Write(bytesWithInvalidUTFPairs)
case q == "SELECT MaxPayloadSize":
w.WriteHeader(http.StatusOK)

// generate 10M payload size
b := make([]byte, 10485760)
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
fmt.Fprint(w, b)
fmt.Fprint(w, "Ok.\n")
default:
if strings.Contains(string(query), killQueryPattern) {
fakeCHState.kill()
Expand Down
9 changes: 8 additions & 1 deletion metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,13 @@ var (
},
[]string{"cache"},
)
cacheSkipped = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "cache_payloadsize_too_big_total",
Help: "The amount of too big payloads to be cached",
},
[]string{"cache", "user", "cluster", "cluster_user"},
)
requestDuration = prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Name: "request_duration_seconds",
Expand Down Expand Up @@ -194,7 +201,7 @@ func registerMetrics() {
limitExcess, hostPenalties, hostHealth, concurrentQueries,
requestQueueSize, userQueueOverflow, clusterUserQueueOverflow,
requestBodyBytes, responseBodyBytes,
cacheHit, cacheMiss, cacheSize, cacheItems,
cacheHit, cacheMiss, cacheSize, cacheItems, cacheSkipped,
requestDuration, proxiedResponseDuration, cachedResponseDuration,
canceledRequest, timeoutRequest,
configSuccess, configSuccessTime, badRequest)
Expand Down
15 changes: 14 additions & 1 deletion proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,9 +351,22 @@ func (rp *reverseProxy) serveFromCache(s *scope, srw *statResponseWriter, req *h
if err != nil {
err = fmt.Errorf("%s: %w; query: %q", s, err, q)
respondWith(srw, err, http.StatusInternalServerError)
return
}
} else {
// Do not cache responses greater than max payload size.
if contentLength > int64(s.user.cache.MaxPayloadSize) {
cacheSkipped.With(labels).Inc()
log.Infof("%s: Request will not be cached. Content length (%d) is greater than max payload size (%d)", s, contentLength, s.user.cache.MaxPayloadSize)

rp.completeTransaction(s, statusCode, userCache, key, q)

err = RespondWithData(srw, reader, contentMetadata, 0*time.Second, tmpFileRespWriter.StatusCode())
if err != nil {
err = fmt.Errorf("%s: %w; query: %q", s, err, q)
respondWith(srw, err, http.StatusInternalServerError)
}
return
}
cacheMiss.With(labels).Inc()
log.Debugf("%s: cache miss", s)
expiration, err := userCache.Put(reader, contentMetadata, key)
Expand Down
29 changes: 29 additions & 0 deletions proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"crypto/tls"
"fmt"
"github.com/contentsquare/chproxy/cache"
"io"
"math/rand"
"net"
Expand Down Expand Up @@ -249,6 +250,19 @@ func TestReverseProxy_ServeHTTP1(t *testing.T) {
return makeRequest(p)
},
},
{
cfg: goodCfg,
name: "max payload size limit",
expResponse: okResponse,
expStatusCode: http.StatusOK,
f: func(p *reverseProxy) *http.Response {
p.caches["max_payload_size"] = &cache.AsyncCache{
MaxPayloadSize: 8 * 1024 * 1024,
}
p.users["default"].cache = p.caches["max_payload_size"]
return makeRequest(p)
},
},
{
cfg: goodCfg,
name: "queue overflow for user",
Expand Down Expand Up @@ -397,6 +411,21 @@ func TestReverseProxy_ServeHTTP1(t *testing.T) {
return makeCustomRequest(p, req)
},
},
{
cfg: authCfg,
name: "post request max payload size",
expResponse: okResponse,
expStatusCode: http.StatusOK,
f: func(p *reverseProxy) *http.Response {
uri := fmt.Sprintf("%s?user=foo&password=bar", fakeServer.URL)
req := httptest.NewRequest("POST", uri, nil)
p.caches["max_payload_size"] = &cache.AsyncCache{
MaxPayloadSize: 8 * 1024 * 1024,
}
p.users["foo"].cache = p.caches["max_payload_size"]
return makeCustomRequest(p, req)
},
},
}

for _, tc := range testCases {
Expand Down
Loading

0 comments on commit d257a95

Please sign in to comment.