Skip to content

Commit

Permalink
max payload size for caching
Browse files Browse the repository at this point in the history
  • Loading branch information
sigua-cs committed Sep 7, 2022
1 parent b459dcc commit c65a989
Show file tree
Hide file tree
Showing 15 changed files with 364 additions and 8 deletions.
5 changes: 5 additions & 0 deletions cache/async_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ type AsyncCache struct {
TransactionRegistry

graceTime time.Duration

MaxPayloadSize config.ByteSize
}

func (c *AsyncCache) Close() error {
Expand Down Expand Up @@ -103,9 +105,12 @@ func NewAsyncCache(cfg config.Cache, maxExecutionTime time.Duration) (*AsyncCach
return nil, err
}

maxPayloadSize := cfg.MaxPayloadSize

return &AsyncCache{
Cache: cache,
TransactionRegistry: transaction,
graceTime: graceTime,
MaxPayloadSize: maxPayloadSize,
}, nil
}
6 changes: 4 additions & 2 deletions cache/async_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,8 @@ func TestAsyncCache_FilesystemCache_instantiation(t *testing.T) {
Dir: asyncTestDir,
MaxSize: 8192,
},
Expire: config.Duration(time.Minute),
Expire: config.Duration(time.Minute),
MaxPayloadSize: config.ByteSize(100000000),
}
if err := os.RemoveAll(testDirAsync); err != nil {
log.Fatalf("cannot remove %q: %s", testDirAsync, err)
Expand Down Expand Up @@ -248,7 +249,8 @@ func TestAsyncCache_RedisCache_instantiation(t *testing.T) {
Redis: config.RedisCacheConfig{
Addresses: []string{s.Addr()},
},
Expire: config.Duration(cacheTTL),
Expire: config.Duration(cacheTTL),
MaxPayloadSize: config.ByteSize(100000000),
}

_, err := NewAsyncCache(redisCfg, 1*time.Second)
Expand Down
9 changes: 9 additions & 0 deletions config/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ expire: <duration>
# By default `grace_time` is 5s. Negative value disables the protection
# from `thundering herd` problem.
grace_time: <duration>

# Maximum total size of request payload for caching. The default value
# is set to 1 Petabyte.
max_payload_size: <byte_size>
```
### <distributed_cache_config>
Expand Down Expand Up @@ -113,6 +117,11 @@ expire: <duration>
# By default `grace_time` is 5s. Negative value disables the protection
# from `thundering herd` problem.
grace_time: <duration>

# Maximum total size of request payload for caching. The default value
# is set to 1 Petabyte.
# The default value set so high is to allow users who do not use response size limitations virtually unlimited cache.
max_payload_size: <byte_size>
```
### <param_groups_config>
Expand Down
12 changes: 12 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ var (
}

defaultExecutionTime = Duration(120 * time.Second)

defaultMaxPayloadSize = ByteSize(1 << 50)
)

// Config describes server configuration, access and proxy rules
Expand Down Expand Up @@ -601,6 +603,9 @@ type Cache struct {

// Catches all undefined fields
XXX map[string]interface{} `yaml:",inline"`

// Maximum total size of request payload for caching
MaxPayloadSize ByteSize `yaml:"max_payload_size,omitempty"`
}

type FileSystemCacheConfig struct {
Expand Down Expand Up @@ -812,6 +817,13 @@ func LoadFile(filename string) (*Config, error) {
}
}

for i := range cfg.Caches {
c := &cfg.Caches[i]
if c.MaxPayloadSize <= 0 {
c.MaxPayloadSize = defaultMaxPayloadSize
}
}

if maxResponseTime < 0 {
maxResponseTime = 0
}
Expand Down
15 changes: 12 additions & 3 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ var fullConfig = Config{
Dir: "/path/to/longterm/cachedir",
MaxSize: ByteSize(100 << 30),
},
Expire: Duration(time.Hour),
GraceTime: Duration(20 * time.Second),
Expire: Duration(time.Hour),
GraceTime: Duration(20 * time.Second),
MaxPayloadSize: ByteSize(100 << 30),
},
{
Name: "shortterm",
Expand All @@ -31,7 +32,8 @@ var fullConfig = Config{
Dir: "/path/to/shortterm/cachedir",
MaxSize: ByteSize(100 << 20),
},
Expire: Duration(10 * time.Second),
Expire: Duration(10 * time.Second),
MaxPayloadSize: ByteSize(100 << 20),
},
},
HackMePlease: true,
Expand Down Expand Up @@ -441,6 +443,11 @@ func TestBadConfig(t *testing.T) {
"testdata/bad.heartbeat_section.empty.yml",
"`cluster.heartbeat` cannot be unset for \"cluster\"",
},
{
"max payload size to cache",
"testdata/bad.max_payload_size.yml",
"cannot parse byte size \"-10B\": it must be positive float followed by optional units. For example, 1.5Gb, 3T",
},
}

for _, tc := range testCases {
Expand Down Expand Up @@ -815,12 +822,14 @@ caches:
file_system:
dir: /path/to/longterm/cachedir
max_size: 107374182400
max_payload_size: 107374182400
- mode: file_system
name: shortterm
expire: 10s
file_system:
dir: /path/to/shortterm/cachedir
max_size: 104857600
max_payload_size: 104857600
param_groups:
- name: cron-job
params:
Expand Down
21 changes: 21 additions & 0 deletions config/testdata/bad.max_payload_size.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
caches:
- name: "longterm"
mode: "file_system"
max_payload_size: "-10B"
file_system:
dir: "cache_dir"
max_size: 100Gb

server:
http:
listen_addr: ":8080"

users:
- name: "dummy"
allowed_networks: ["1.2.3.4"]
to_cluster: "cluster"
to_user: "default"

clusters:
- name: "cluster"
nodes: ["127.0.1.1:8123"]
3 changes: 3 additions & 0 deletions config/testdata/full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ caches:
# Path to directory where cached responses will be stored.
dir: "/path/to/longterm/cachedir"

max_payload_size: 100Gb

# Expiration time for cached responses.
expire: 1h

Expand All @@ -44,6 +46,7 @@ caches:
file_system:
max_size: 100Mb
dir: "/path/to/shortterm/cachedir"
max_payload_size: 100Mb
expire: 10s

# Optional network lists, might be used as values for `allowed_networks`.
Expand Down
5 changes: 4 additions & 1 deletion docs/content/en/configuration/caching.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,7 @@ Distributed cache relies on external database to share cache across multiple rep
multiple replicas deployments. Currently only [Redis](https://redis.io/) key value store is supported.
Configuration template for distributed cache can be found [here](https://github.com/ContentSquare/chproxy/blob/master/config/#distributed_cache_config).


#### Response limitations for caching
Before caching Clickhouse response, chproxy verifies that the response size
is not greater than configured max size. This setting can be specified in config section of the cache `max_payload_size`. The default value
is set to 1 Petabyte. Therefore, by default this security mechanism is disabled.
168 changes: 168 additions & 0 deletions main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,90 @@ func TestServe(t *testing.T) {
},
startTLS,
},
{
"https cache max payload size",
"testdata/https.cache.max-payload-size.yml",
func(t *testing.T) {
q := "SELECT MaxPayloadSize"
req, err := http.NewRequest("GET", "https://127.0.0.1:8443?query="+url.QueryEscape(q), nil)
checkErr(t, err)
req.SetBasicAuth("default", "qwerty")
req.Close = true

resp, err := tlsClient.Do(req)
checkErr(t, err)
if resp.StatusCode != http.StatusOK {
t.Fatalf("unexpected status code: %d; expected: %d", resp.StatusCode, http.StatusOK)
}

checkResponse(t, resp.Body, expectedOkResp)

key := &cache.Key{
Query: []byte(q),
AcceptEncoding: "gzip",
Version: cache.Version,
}

cc := proxy.caches["https_cache_max_payload_size"]
cachedData, err := cc.Get(key)

if cachedData != nil || err == nil {
t.Fatal("skipped response from cache is expected")
}

resp.Body.Close()
},
startTLS,
},
{
"https cache max payload size not reached",
"testdata/https.cache.max-payload-size-not-reached.yml",
func(t *testing.T) {
q := "SELECT MaxPayloadSize"
req, err := http.NewRequest("GET", "https://127.0.0.1:8443?query="+url.QueryEscape(q), nil)
checkErr(t, err)
req.SetBasicAuth("default", "qwerty")
req.Close = true

resp, err := tlsClient.Do(req)
checkErr(t, err)
if resp.StatusCode != http.StatusOK {
t.Fatalf("unexpected status code: %d; expected: %d", resp.StatusCode, http.StatusOK)
}

checkResponse(t, resp.Body, expectedOkResp)

key := &cache.Key{
Query: []byte(q),
AcceptEncoding: "gzip",
Version: cache.Version,
}

path := fmt.Sprintf("%s/cache/%s", testDir, key.String())
if _, err := os.Stat(path); err != nil {
t.Fatalf("err while getting file %q info: %s", path, err)
}

rw := httptest.NewRecorder()

cc := proxy.caches["https_cache_max_payload_size"]
cachedData, err := cc.Get(key)

if err != nil {
t.Fatalf("unexpected error while getting response from cache: %s", err)
}

err = RespondWithData(rw, cachedData.Data, cachedData.ContentMetadata, cachedData.Ttl, 200)
if err != nil {
t.Fatalf("unexpected error while getting response from cache: %s", err)
}
checkResponse(t, rw.Body, expectedOkResp)

cachedData.Data.Close()
resp.Body.Close()
},
startTLS,
},
{
"https cache with mix query source",
"testdata/https.cache.yml",
Expand Down Expand Up @@ -339,6 +423,82 @@ func TestServe(t *testing.T) {
},
startHTTP,
},
{
"http cache max payload size",
"testdata/http.cache.max-payload-size.yml",
func(t *testing.T) {
q := "SELECT MaxPayloadSize"
req, err := http.NewRequest("GET", "http://127.0.0.1:9090?query="+url.QueryEscape(q), nil)
checkErr(t, err)
req.SetBasicAuth("default", "qwerty")
req.Close = true

resp, err := httpRequest(t, req, http.StatusOK)
checkResponse(t, resp.Body, expectedOkResp)
if resp.StatusCode != http.StatusOK {
t.Fatalf("unexpected status code: %d; expected: %d", resp.StatusCode, http.StatusOK)
}

key := &cache.Key{
Query: []byte(q),
AcceptEncoding: "gzip",
Version: cache.Version,
}

cc := proxy.caches["http_cache_max_payload_size"]
cachedData, err := cc.Get(key)

if cachedData != nil || err == nil {
t.Fatal("skipped response from cache is expected")
}

resp.Body.Close()
},
startHTTP,
},
{
"http cache max payload size not reached",
"testdata/http.cache.max-payload-size-not-reached.yml",
func(t *testing.T) {
q := "SELECT MaxPayloadSize"
req, err := http.NewRequest("GET", "http://127.0.0.1:9090?query="+url.QueryEscape(q), nil)
checkErr(t, err)
req.SetBasicAuth("default", "qwerty")
req.Close = true

resp, err := httpRequest(t, req, http.StatusOK)
checkResponse(t, resp.Body, expectedOkResp)
if resp.StatusCode != http.StatusOK {
t.Fatalf("unexpected status code: %d; expected: %d", resp.StatusCode, http.StatusOK)
}

key := &cache.Key{
Query: []byte(q),
AcceptEncoding: "gzip",
Version: cache.Version,
}

cc := proxy.caches["http_cache_max_payload_size"]
cachedData, err := cc.Get(key)

if err != nil {
t.Fatalf("unexpected error while getting response from cache: %s", err)
}

rw := httptest.NewRecorder()

err = RespondWithData(rw, cachedData.Data, cachedData.ContentMetadata, cachedData.Ttl, 200)
if err != nil {
t.Fatalf("unexpected error while getting response from cache: %s", err)
}
checkResponse(t, rw.Body, expectedOkResp)

cachedData.Data.Close()

resp.Body.Close()
},
startHTTP,
},
{
"http requests with caching in redis ",
"testdata/http.cache.redis.yml",
Expand Down Expand Up @@ -794,6 +954,14 @@ func fakeCHHandler(w http.ResponseWriter, r *http.Request) {
case q == "SELECT 1 FORMAT TabSeparatedWithNamesAndTypes":
w.WriteHeader(http.StatusOK)
w.Write(bytesWithInvalidUTFPairs)
case q == "SELECT MaxPayloadSize":
w.WriteHeader(http.StatusOK)

// generate 10M payload size
b := make([]byte, 10485760)
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
fmt.Fprint(w, b)
fmt.Fprint(w, "Ok.\n")
default:
if strings.Contains(string(query), killQueryPattern) {
fakeCHState.kill()
Expand Down
Loading

0 comments on commit c65a989

Please sign in to comment.