Skip to content

Commit

Permalink
max_error_reason_size limit error reason (#386)
Browse files Browse the repository at this point in the history
* max_error_reason_size limit error reason

* fix tests

* fix tests

* fix tests

* fix tests

* max_error_reason_size to 5MB

---------

Co-authored-by: k.torgaev <[email protected]>
  • Loading branch information
kasimtj and k.torgaev authored Jan 16, 2024
1 parent ff8f294 commit b4a6f2a
Show file tree
Hide file tree
Showing 8 changed files with 145 additions and 15 deletions.
5 changes: 5 additions & 0 deletions config/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ param_groups:
# Named network lists
network_groups: <network_groups_config> ... [optional]

# Maximum total size of fail reason of queries. Config prevents large tmp files from being read into memory, affects only cachable queries
# The default value is set to 1 Petabyte.
# If error reason exceeds limit "unknown error reason" will be stored as a fail reason
max_error_reason_size: <byte_size>

# Settings for connection pool to ClickHouse
connection_pool:
max_idle_conns: 100
Expand Down
9 changes: 9 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ var (

defaultMaxPayloadSize = ByteSize(1 << 50)

defaultMaxErrorReasonSize = ByteSize(1 << 50)

defaultRetryNumber = 0
)

Expand All @@ -67,6 +69,9 @@ type Config struct {

NetworkGroups []NetworkGroups `yaml:"network_groups,omitempty"`

// Maximum size of error payload
MaxErrorReasonSize ByteSize `yaml:"max_error_reason_size,omitempty"`

Caches []Cache `yaml:"caches,omitempty"`

ParamGroups []ParamGroup `yaml:"param_groups,omitempty"`
Expand Down Expand Up @@ -188,6 +193,10 @@ func (cfg *Config) setDefaults() error {
c.setDefaults()
}

if cfg.MaxErrorReasonSize <= 0 {
cfg.MaxErrorReasonSize = defaultMaxErrorReasonSize
}

cfg.setServerMaxResponseTime(maxResponseTime)

return nil
Expand Down
10 changes: 9 additions & 1 deletion config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,8 @@ var fullConfig = Config{
},
},
},
networkReg: map[string]Networks{},
MaxErrorReasonSize: ByteSize(100 << 20),
networkReg: map[string]Networks{},
}

func TestLoadConfig(t *testing.T) {
Expand Down Expand Up @@ -319,6 +320,7 @@ func TestLoadConfig(t *testing.T) {
MaxExecutionTime: Duration(120 * time.Second),
},
},
MaxErrorReasonSize: ByteSize(1 << 50),
},
},
}
Expand Down Expand Up @@ -505,6 +507,11 @@ func TestBadConfig(t *testing.T) {
"testdata/bad.proxy_settings.yml",
"`proxy_header` cannot be set without enabling proxy settings",
},
{
"max error reason size",
"testdata/bad.max_error_reason_size.yml",
"cannot parse byte size \"-10B\": it must be positive float followed by optional units. For example, 1.5Gb, 3T",
},
}

for _, tc := range testCases {
Expand Down Expand Up @@ -880,6 +887,7 @@ network_groups:
- name: reporting-apps
networks:
- 10.10.10.0/24
max_error_reason_size: 104857600
caches:
- mode: file_system
name: longterm
Expand Down
21 changes: 21 additions & 0 deletions config/testdata/bad.max_error_reason_size.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
caches:
- name: "longterm"
mode: "file_system"
file_system:
dir: "cache_dir"
max_size: 100Gb

server:
http:
listen_addr: ":8080"

users:
- name: "dummy"
allowed_networks: ["1.2.3.4"]
to_cluster: "cluster"
to_user: "default"

clusters:
- name: "cluster"
nodes: ["127.0.1.1:8123"]
max_error_reason_size: "-10B"
2 changes: 2 additions & 0 deletions config/testdata/full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ network_groups:
- name: "reporting-apps"
networks: ["10.10.10.0/24"]

max_error_reason_size: 100Mb

# Optional lists of query params to send with each proxied request to ClickHouse.
# These lists may be used for overriding ClickHouse settings on a per-user basis.
param_groups:
Expand Down
4 changes: 4 additions & 0 deletions docs/src/content/docs/configuration/default.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ network_groups:
- name: "reporting-apps"
networks: ["10.10.10.0/24"]

# Maximum total size of fail reason of queries. Config prevents large tmp files from being read into memory, affects only cachable queries
# If error reason exceeds limit "unknown error reason" will be stored as a fail reason
max_error_reason_size: 100GB

# Optional lists of query params to send with each proxied request to ClickHouse.
# These lists may be used for overriding ClickHouse settings on a per-user basis.
param_groups:
Expand Down
17 changes: 13 additions & 4 deletions proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type reverseProxy struct {
hasWildcarded bool
maxIdleConns int
maxIdleConnsPerHost int
maxErrorReasonSize int64
}

func newReverseProxy(cfgCp *config.ConnectionPool) *reverseProxy {
Expand Down Expand Up @@ -451,12 +452,18 @@ func (rp *reverseProxy) serveFromCache(s *scope, srw *statResponseWriter, req *h
tmpFileRespWriter.WriteHeader(srw.statusCode)
}

errString, err := toString(reader)
if err != nil {
log.Errorf("%s failed to get error reason: %s", s, err.Error())
errReason := "unknown error reason"
if contentLength > rp.maxErrorReasonSize {
log.Infof("%s: Error reason length (%d) is greater than max error reason size (%d)", s, contentLength, rp.maxErrorReasonSize)
} else {
errString, err := toString(reader)
if err != nil {
log.Errorf("%s failed to get error reason: %s", s, err.Error())
}

errReason = fmt.Sprintf("%s %s", failedTransactionPrefix, errString)
}

errReason := fmt.Sprintf("%s %s", failedTransactionPrefix, errString)
rp.completeTransaction(s, statusCode, userCache, key, q, errReason)

// we need to reset the offset since the reader of tmpFileRespWriter was already
Expand Down Expand Up @@ -622,6 +629,8 @@ func (rp *reverseProxy) applyConfig(cfg *config.Config) error {
return err
}

rp.maxErrorReasonSize = int64(cfg.MaxErrorReasonSize)

caches := make(map[string]*cache.AsyncCache, len(cfg.Caches))
defer func() {
// caches is swapped with old caches from rp.caches
Expand Down
92 changes: 82 additions & 10 deletions proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"crypto/tls"
"fmt"
"hash/fnv"
"io"
"math/rand"
"net"
Expand Down Expand Up @@ -41,7 +42,10 @@ const (
okResponse = "1"
badGatewayResponse = "]: cannot reach 127.0.0.1:"
)
const testCacheDir = "./test-cache-data"
const (
testCacheDir = "./test-cache-data"
fileSystemCache = "file_system_cache"
)

var goodCfg = &config.Config{
Server: config.Server{
Expand Down Expand Up @@ -97,15 +101,56 @@ var goodCfgWithCache = &config.Config{
Name: defaultUsername,
ToCluster: "cluster",
ToUser: "web",
Cache: "file_system_cache",
Cache: fileSystemCache,
},
},
ParamGroups: []config.ParamGroup{
{Name: "param_test", Params: []config.Param{{Key: "param_key", Value: "param_value"}}},
},
Caches: []config.Cache{
{
Name: "file_system_cache",
Name: fileSystemCache,
Mode: "file_system",
FileSystem: config.FileSystemCacheConfig{
Dir: testCacheDir,
MaxSize: config.ByteSize(1024 * 1024),
},
Expire: config.Duration(1000 * 60 * 60),
},
},
MaxErrorReasonSize: config.ByteSize(100 << 20),
}
var goodCfgWithCacheAndMaxErrorReasonSize = &config.Config{
Clusters: []config.Cluster{
{
Name: "cluster",
Scheme: "http",
Replicas: []config.Replica{
{
Nodes: []string{"localhost:8123"},
},
},
ClusterUsers: []config.ClusterUser{
{
Name: "web",
},
},
},
},
Users: []config.User{
{
Name: defaultUsername,
ToCluster: "cluster",
ToUser: "web",
Cache: fileSystemCache,
},
},
ParamGroups: []config.ParamGroup{
{Name: "param_test", Params: []config.Param{{Key: "param_key", Value: "param_value"}}},
},
Caches: []config.Cache{
{
Name: fileSystemCache,
Mode: "file_system",
FileSystem: config.FileSystemCacheConfig{
Dir: testCacheDir,
Expand Down Expand Up @@ -310,21 +355,32 @@ var fullWildcardedCfg = &config.Config{
},
}

func compareTransactionFailReason(t *testing.T, p *reverseProxy, user config.ClusterUser, query string, failReason string) {
h := fnv.New32a()
h.Write([]byte(user.Name + user.Password))
transactionKey := cache.NewKey([]byte(query), url.Values{"query": []string{query}}, "", 0, 0, h.Sum32())
transactionStatus, err := p.caches[fileSystemCache].TransactionRegistry.Status(transactionKey)
assert.Nil(t, err)
assert.Equal(t, failReason, transactionStatus.FailReason)
}

func TestReverseProxy_ServeHTTP1(t *testing.T) {
query := "SELECT123456"
testCases := []struct {
cfg *config.Config
name string
expResponse string
expStatusCode int
f func(p *reverseProxy) *http.Response
cfg *config.Config
name string
expResponse string
expStatusCode int
f func(p *reverseProxy) *http.Response
transactionFailReason string
}{
{
cfg: goodCfg,
name: "Bad gatway response without cache",
expResponse: badGatewayResponse,
expStatusCode: http.StatusBadGateway,
f: func(p *reverseProxy) *http.Response {
req := httptest.NewRequest("GET", fmt.Sprintf("%s/badGateway?query=SELECT123456", fakeServer.URL), nil)
req := httptest.NewRequest("GET", fmt.Sprintf("%s/badGateway?query=%s", fakeServer.URL, query), nil)
return makeCustomRequest(p, req)
},
},
Expand All @@ -334,11 +390,12 @@ func TestReverseProxy_ServeHTTP1(t *testing.T) {
expResponse: badGatewayResponse,
expStatusCode: http.StatusBadGateway,
f: func(p *reverseProxy) *http.Response {
req := httptest.NewRequest("GET", fmt.Sprintf("%s/badGateway?query=SELECT123456", fakeServer.URL), nil)
req := httptest.NewRequest("GET", fmt.Sprintf("%s/badGateway?query=%s", fakeServer.URL, query), nil)
// cleaning the cache to be sure it will be a cache miss although the query isn't supposed to be cached
os.RemoveAll(testCacheDir)
return makeCustomRequest(p, req)
},
transactionFailReason: "[concurrent query failed] ]: cannot reach 127.0.0.1:\n",
},
{
cfg: goodCfg,
Expand Down Expand Up @@ -705,6 +762,17 @@ func TestReverseProxy_ServeHTTP1(t *testing.T) {
return makeHeavyRequest(p, time.Millisecond*200)
},
},
{
cfg: goodCfgWithCacheAndMaxErrorReasonSize,
name: "max error reason size",
expResponse: badGatewayResponse,
expStatusCode: http.StatusBadGateway,
f: func(p *reverseProxy) *http.Response {
req := httptest.NewRequest("GET", fmt.Sprintf("%s/badGateway?query=%s", fakeServer.URL, query), nil)
return makeCustomRequest(p, req)
},
transactionFailReason: "unknown error reason",
},
}

for _, tc := range testCases {
Expand All @@ -717,6 +785,9 @@ func TestReverseProxy_ServeHTTP1(t *testing.T) {
resp := tc.f(proxy)
b := bbToString(t, resp.Body)
resp.Body.Close()
if len(tc.cfg.Caches) != 0 {
compareTransactionFailReason(t, proxy, tc.cfg.Clusters[0].ClusterUsers[0], query, tc.transactionFailReason)
}
if !strings.Contains(b, tc.expResponse) {
t.Fatalf("expected response: %q; got: %q for %q", tc.expResponse, b, tc.name)
}
Expand Down Expand Up @@ -972,6 +1043,7 @@ var (
}
if r.URL.Path == "/badGateway" {
w.WriteHeader(http.StatusBadGateway)
fmt.Fprintln(w, badGatewayResponse)
return
}

Expand Down

0 comments on commit b4a6f2a

Please sign in to comment.