Skip to content

Commit

Permalink
max payload size for caching
Browse files Browse the repository at this point in the history
  • Loading branch information
sigua-cs committed Jul 19, 2022
1 parent 6d45135 commit c6ec7c6
Show file tree
Hide file tree
Showing 9 changed files with 86 additions and 22 deletions.
5 changes: 5 additions & 0 deletions cache/async_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ type AsyncCache struct {
TransactionRegistry

graceTime time.Duration

MaxPayloadSize int64
}

func (c *AsyncCache) Close() error {
Expand Down Expand Up @@ -103,9 +105,12 @@ func NewAsyncCache(cfg config.Cache, maxExecutionTime time.Duration) (*AsyncCach
return nil, err
}

maxPayloadSize := int64(cfg.MaxPayloadSize)

return &AsyncCache{
Cache: cache,
TransactionRegistry: transaction,
graceTime: graceTime,
MaxPayloadSize: maxPayloadSize,
}, nil
}
2 changes: 2 additions & 0 deletions cache/async_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ func TestAsyncCache_FilesystemCache_instantiation(t *testing.T) {
MaxSize: 8192,
},
Expire: config.Duration(time.Minute),
MaxPayloadSize: config.ByteSize(100000000),
}
if err := os.RemoveAll(testDirAsync); err != nil {
log.Fatalf("cannot remove %q: %s", testDirAsync, err)
Expand Down Expand Up @@ -249,6 +250,7 @@ func TestAsyncCache_RedisCache_instantiation(t *testing.T) {
Addresses: []string{s.Addr()},
},
Expire: config.Duration(cacheTTL),
MaxPayloadSize: config.ByteSize(100000000),
}

_, err := NewAsyncCache(redisCfg, 1*time.Second)
Expand Down
12 changes: 12 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ var (
}

defaultExecutionTime = Duration(30 * time.Second)

defaultMaxPayloadSize = ByteSize(100000000)
)

// Config describes server configuration, access and proxy rules
Expand Down Expand Up @@ -609,6 +611,9 @@ type Cache struct {

// Catches all undefined fields
XXX map[string]interface{} `yaml:",inline"`

// Maximum total size of request payload for caching
MaxPayloadSize ByteSize `yaml:"max_payload_size,omitempty"`
}

type FileSystemCacheConfig struct {
Expand Down Expand Up @@ -820,6 +825,13 @@ func LoadFile(filename string) (*Config, error) {
}
}

for i := range cfg.Caches {
c := &cfg.Caches[i]
if c.MaxPayloadSize <= 0 {
c.MaxPayloadSize = defaultMaxPayloadSize
}
}

if maxResponseTime < 0 {
maxResponseTime = 0
}
Expand Down
15 changes: 12 additions & 3 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ var fullConfig = Config{
Dir: "/path/to/longterm/cachedir",
MaxSize: ByteSize(100 << 30),
},
Expire: Duration(time.Hour),
GraceTime: Duration(20 * time.Second),
Expire: Duration(time.Hour),
GraceTime: Duration(20 * time.Second),
MaxPayloadSize: ByteSize(100 << 30),
},
{
Name: "shortterm",
Expand All @@ -31,7 +32,8 @@ var fullConfig = Config{
Dir: "/path/to/shortterm/cachedir",
MaxSize: ByteSize(100 << 20),
},
Expire: Duration(10 * time.Second),
Expire: Duration(10 * time.Second),
MaxPayloadSize: ByteSize(100 << 20),
},
},
HackMePlease: true,
Expand Down Expand Up @@ -452,6 +454,11 @@ func TestBadConfig(t *testing.T) {
"testdata/bad.heartbeat_section.empty2.yml",
"cannot be use `heartbeat_interval` with `heartbeat` section",
},
{
"max payload size to cache",
"testdata/bad.max_payload_size.yml",
"cannot parse byte size \"-10B\": it must be positive float followed by optional units. For example, 1Gb, 100Mb",
},
}

for _, tc := range testCases {
Expand Down Expand Up @@ -827,12 +834,14 @@ caches:
file_system:
dir: /path/to/longterm/cachedir
max_size: 107374182400
max_payload_size: 107374182400
- mode: file_system
name: shortterm
expire: 10s
file_system:
dir: /path/to/shortterm/cachedir
max_size: 104857600
max_payload_size: 104857600
param_groups:
- name: cron-job
params:
Expand Down
21 changes: 21 additions & 0 deletions config/testdata/bad.max_payload_size.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
caches:
- name: "longterm"
mode: "file_system"
max_payload_size: "-10B"
file_system:
dir: "cache_dir"
max_size: 100Gb

server:
http:
listen_addr: ":8080"

users:
- name: "dummy"
allowed_networks: ["1.2.3.4"]
to_cluster: "cluster"
to_user: "default"

clusters:
- name: "cluster"
nodes: ["127.0.1.1:8123"]
3 changes: 3 additions & 0 deletions config/testdata/full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ caches:
# Path to directory where cached responses will be stored.
dir: "/path/to/longterm/cachedir"

max_payload_size: 100Gb

# Expiration time for cached responses.
expire: 1h

Expand All @@ -44,6 +46,7 @@ caches:
file_system:
max_size: 100Mb
dir: "/path/to/shortterm/cachedir"
max_payload_size: 100Mb
expire: 10s

# Optional network lists, might be used as values for `allowed_networks`.
Expand Down
41 changes: 24 additions & 17 deletions proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,25 +339,32 @@ func (rp *reverseProxy) serveFromCache(s *scope, srw *statResponseWriter, req *h
contentLength := bufferedRespWriter.GetCapturedContentLength()
reader := bufferedRespWriter.Reader()

// we create this buffer to be able to stream data both to cache as well as to an end user
var buf bytes.Buffer
tee := io.TeeReader(reader, &buf)
contentMetadata := cache.ContentMetadata{Length: contentLength, Encoding: contentEncoding, Type: contentType}
expiration, err := userCache.Put(tee, contentMetadata, key)
if err != nil {
log.Errorf("%s: %s; query: %q - failed to put response in the cache", s, err, q)
}
if isToCache(contentLength, s) {
// we create this buffer to be able to stream data both to cache as well as to an end user
var buf bytes.Buffer
tee := io.TeeReader(reader, &buf)
contentMetadata := cache.ContentMetadata{Length: contentLength, Encoding: contentEncoding, Type: contentType}
expiration, err := userCache.Put(tee, contentMetadata, key)
if err != nil {
log.Errorf("%s: %s; query: %q - failed to put response in the cache", s, err, q)
}

// mark transaction as completed
if err = userCache.Complete(key); err != nil {
log.Errorf("%s: %s; query: %q", s, err, q)
}
// mark transaction as completed
if err = userCache.Complete(key); err != nil {
log.Errorf("%s: %s; query: %q", s, err, q)
}

err = RespondWithData(srw, &buf, contentMetadata, expiration, bufferedRespWriter.StatusCode())
if err != nil {
err = fmt.Errorf("%s: %w; query: %q", s, err, q)
respondWith(srw, err, http.StatusInternalServerError)
return
err = RespondWithData(srw, &buf, contentMetadata, expiration, bufferedRespWriter.StatusCode())
if err != nil {
err = fmt.Errorf("%s: %w; query: %q", s, err, q)
respondWith(srw, err, http.StatusInternalServerError)
return
}
} else {
err = RespondWithoutData(srw)
if err != nil {
log.Errorf("%s: %s; query: %q - failed to put response in the cache", s, err, q)
}
}
}
}
Expand Down
5 changes: 5 additions & 0 deletions utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,3 +288,8 @@ func calcMapHash(m map[string]string) (uint32, error) {
}
return h.Sum32(), nil
}

func isToCache(length int64, s *scope) bool {
maxPayloadSize := s.user.cache.MaxPayloadSize
return length <= maxPayloadSize
}
4 changes: 2 additions & 2 deletions utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,13 +354,13 @@ func TestCalcMapHash(t *testing.T) {
},
{
"map with multiple value",
map[string]string{"param_table_name": "clients","param_database":"default"},
map[string]string{"param_table_name": "clients", "param_database": "default"},
0x6fddf04d,
nil,
},
{
"map with exchange value",
map[string]string{"param_database":"default","param_table_name":"clients"},
map[string]string{"param_database": "default", "param_table_name": "clients"},
0x6fddf04d,
nil,
},
Expand Down

0 comments on commit c6ec7c6

Please sign in to comment.