diff --git a/config/README.md b/config/README.md index 86045e56..d4dbf57c 100644 --- a/config/README.md +++ b/config/README.md @@ -27,6 +27,11 @@ param_groups: # Named network lists network_groups: ... [optional] +# Maximum total size of fail reason of queries. Config prevents large tmp files from being read into memory, affects only cachable queries +# The default value is set to 1 Petabyte. +# If error reason exceeds limit "unknown error reason" will be stored as a fail reason +max_error_reason_size: + # Settings for connection pool to ClickHouse connection_pool: max_idle_conns: 100 diff --git a/config/config.go b/config/config.go index 7887211f..659f2e76 100644 --- a/config/config.go +++ b/config/config.go @@ -48,6 +48,8 @@ var ( defaultMaxPayloadSize = ByteSize(1 << 50) + defaultMaxErrorReasonSize = ByteSize(1 << 50) + defaultRetryNumber = 0 ) @@ -67,6 +69,9 @@ type Config struct { NetworkGroups []NetworkGroups `yaml:"network_groups,omitempty"` + // Maximum size of error payload + MaxErrorReasonSize ByteSize `yaml:"max_error_reason_size,omitempty"` + Caches []Cache `yaml:"caches,omitempty"` ParamGroups []ParamGroup `yaml:"param_groups,omitempty"` @@ -188,6 +193,10 @@ func (cfg *Config) setDefaults() error { c.setDefaults() } + if cfg.MaxErrorReasonSize <= 0 { + cfg.MaxErrorReasonSize = defaultMaxErrorReasonSize + } + cfg.setServerMaxResponseTime(maxResponseTime) return nil diff --git a/config/config_test.go b/config/config_test.go index 53e2fef3..72e52b6a 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -262,7 +262,8 @@ var fullConfig = Config{ }, }, }, - networkReg: map[string]Networks{}, + MaxErrorReasonSize: ByteSize(100 << 20), + networkReg: map[string]Networks{}, } func TestLoadConfig(t *testing.T) { @@ -319,6 +320,7 @@ func TestLoadConfig(t *testing.T) { MaxExecutionTime: Duration(120 * time.Second), }, }, + MaxErrorReasonSize: ByteSize(1 << 50), }, }, } @@ -505,6 +507,11 @@ func TestBadConfig(t *testing.T) { "testdata/bad.proxy_settings.yml", "`proxy_header` cannot be set without enabling proxy settings", }, + { + "max error reason size", + "testdata/bad.max_error_reason_size.yml", + "cannot parse byte size \"-10B\": it must be positive float followed by optional units. For example, 1.5Gb, 3T", + }, } for _, tc := range testCases { @@ -880,6 +887,7 @@ network_groups: - name: reporting-apps networks: - 10.10.10.0/24 +max_error_reason_size: 104857600 caches: - mode: file_system name: longterm diff --git a/config/testdata/bad.max_error_reason_size.yml b/config/testdata/bad.max_error_reason_size.yml new file mode 100644 index 00000000..75b2e1f8 --- /dev/null +++ b/config/testdata/bad.max_error_reason_size.yml @@ -0,0 +1,21 @@ +caches: + - name: "longterm" + mode: "file_system" + file_system: + dir: "cache_dir" + max_size: 100Gb + +server: + http: + listen_addr: ":8080" + +users: + - name: "dummy" + allowed_networks: ["1.2.3.4"] + to_cluster: "cluster" + to_user: "default" + +clusters: + - name: "cluster" + nodes: ["127.0.1.1:8123"] +max_error_reason_size: "-10B" \ No newline at end of file diff --git a/config/testdata/full.yml b/config/testdata/full.yml index 293cca69..7018fb87 100644 --- a/config/testdata/full.yml +++ b/config/testdata/full.yml @@ -69,6 +69,8 @@ network_groups: - name: "reporting-apps" networks: ["10.10.10.0/24"] +max_error_reason_size: 100Mb + # Optional lists of query params to send with each proxied request to ClickHouse. # These lists may be used for overriding ClickHouse settings on a per-user basis. param_groups: diff --git a/docs/src/content/docs/configuration/default.md b/docs/src/content/docs/configuration/default.md index 80735473..cfe8f686 100644 --- a/docs/src/content/docs/configuration/default.md +++ b/docs/src/content/docs/configuration/default.md @@ -85,6 +85,10 @@ network_groups: - name: "reporting-apps" networks: ["10.10.10.0/24"] +# Maximum total size of fail reason of queries. Config prevents large tmp files from being read into memory, affects only cachable queries +# If error reason exceeds limit "unknown error reason" will be stored as a fail reason +max_error_reason_size: 100GB + # Optional lists of query params to send with each proxied request to ClickHouse. # These lists may be used for overriding ClickHouse settings on a per-user basis. param_groups: diff --git a/proxy.go b/proxy.go index da22807f..9312a83f 100644 --- a/proxy.go +++ b/proxy.go @@ -48,6 +48,7 @@ type reverseProxy struct { hasWildcarded bool maxIdleConns int maxIdleConnsPerHost int + maxErrorReasonSize int64 } func newReverseProxy(cfgCp *config.ConnectionPool) *reverseProxy { @@ -451,12 +452,18 @@ func (rp *reverseProxy) serveFromCache(s *scope, srw *statResponseWriter, req *h tmpFileRespWriter.WriteHeader(srw.statusCode) } - errString, err := toString(reader) - if err != nil { - log.Errorf("%s failed to get error reason: %s", s, err.Error()) + errReason := "unknown error reason" + if contentLength > rp.maxErrorReasonSize { + log.Infof("%s: Error reason length (%d) is greater than max error reason size (%d)", s, contentLength, rp.maxErrorReasonSize) + } else { + errString, err := toString(reader) + if err != nil { + log.Errorf("%s failed to get error reason: %s", s, err.Error()) + } + + errReason = fmt.Sprintf("%s %s", failedTransactionPrefix, errString) } - errReason := fmt.Sprintf("%s %s", failedTransactionPrefix, errString) rp.completeTransaction(s, statusCode, userCache, key, q, errReason) // we need to reset the offset since the reader of tmpFileRespWriter was already @@ -622,6 +629,8 @@ func (rp *reverseProxy) applyConfig(cfg *config.Config) error { return err } + rp.maxErrorReasonSize = int64(cfg.MaxErrorReasonSize) + caches := make(map[string]*cache.AsyncCache, len(cfg.Caches)) defer func() { // caches is swapped with old caches from rp.caches diff --git a/proxy_test.go b/proxy_test.go index 7c276ef0..43bd8e14 100644 --- a/proxy_test.go +++ b/proxy_test.go @@ -4,6 +4,7 @@ import ( "bytes" "crypto/tls" "fmt" + "hash/fnv" "io" "math/rand" "net" @@ -41,7 +42,10 @@ const ( okResponse = "1" badGatewayResponse = "]: cannot reach 127.0.0.1:" ) -const testCacheDir = "./test-cache-data" +const ( + testCacheDir = "./test-cache-data" + fileSystemCache = "file_system_cache" +) var goodCfg = &config.Config{ Server: config.Server{ @@ -97,7 +101,7 @@ var goodCfgWithCache = &config.Config{ Name: defaultUsername, ToCluster: "cluster", ToUser: "web", - Cache: "file_system_cache", + Cache: fileSystemCache, }, }, ParamGroups: []config.ParamGroup{ @@ -105,7 +109,48 @@ var goodCfgWithCache = &config.Config{ }, Caches: []config.Cache{ { - Name: "file_system_cache", + Name: fileSystemCache, + Mode: "file_system", + FileSystem: config.FileSystemCacheConfig{ + Dir: testCacheDir, + MaxSize: config.ByteSize(1024 * 1024), + }, + Expire: config.Duration(1000 * 60 * 60), + }, + }, + MaxErrorReasonSize: config.ByteSize(100 << 20), +} +var goodCfgWithCacheAndMaxErrorReasonSize = &config.Config{ + Clusters: []config.Cluster{ + { + Name: "cluster", + Scheme: "http", + Replicas: []config.Replica{ + { + Nodes: []string{"localhost:8123"}, + }, + }, + ClusterUsers: []config.ClusterUser{ + { + Name: "web", + }, + }, + }, + }, + Users: []config.User{ + { + Name: defaultUsername, + ToCluster: "cluster", + ToUser: "web", + Cache: fileSystemCache, + }, + }, + ParamGroups: []config.ParamGroup{ + {Name: "param_test", Params: []config.Param{{Key: "param_key", Value: "param_value"}}}, + }, + Caches: []config.Cache{ + { + Name: fileSystemCache, Mode: "file_system", FileSystem: config.FileSystemCacheConfig{ Dir: testCacheDir, @@ -310,13 +355,24 @@ var fullWildcardedCfg = &config.Config{ }, } +func compareTransactionFailReason(t *testing.T, p *reverseProxy, user config.ClusterUser, query string, failReason string) { + h := fnv.New32a() + h.Write([]byte(user.Name + user.Password)) + transactionKey := cache.NewKey([]byte(query), url.Values{"query": []string{query}}, "", 0, 0, h.Sum32()) + transactionStatus, err := p.caches[fileSystemCache].TransactionRegistry.Status(transactionKey) + assert.Nil(t, err) + assert.Equal(t, failReason, transactionStatus.FailReason) +} + func TestReverseProxy_ServeHTTP1(t *testing.T) { + query := "SELECT123456" testCases := []struct { - cfg *config.Config - name string - expResponse string - expStatusCode int - f func(p *reverseProxy) *http.Response + cfg *config.Config + name string + expResponse string + expStatusCode int + f func(p *reverseProxy) *http.Response + transactionFailReason string }{ { cfg: goodCfg, @@ -324,7 +380,7 @@ func TestReverseProxy_ServeHTTP1(t *testing.T) { expResponse: badGatewayResponse, expStatusCode: http.StatusBadGateway, f: func(p *reverseProxy) *http.Response { - req := httptest.NewRequest("GET", fmt.Sprintf("%s/badGateway?query=SELECT123456", fakeServer.URL), nil) + req := httptest.NewRequest("GET", fmt.Sprintf("%s/badGateway?query=%s", fakeServer.URL, query), nil) return makeCustomRequest(p, req) }, }, @@ -334,11 +390,12 @@ func TestReverseProxy_ServeHTTP1(t *testing.T) { expResponse: badGatewayResponse, expStatusCode: http.StatusBadGateway, f: func(p *reverseProxy) *http.Response { - req := httptest.NewRequest("GET", fmt.Sprintf("%s/badGateway?query=SELECT123456", fakeServer.URL), nil) + req := httptest.NewRequest("GET", fmt.Sprintf("%s/badGateway?query=%s", fakeServer.URL, query), nil) // cleaning the cache to be sure it will be a cache miss although the query isn't supposed to be cached os.RemoveAll(testCacheDir) return makeCustomRequest(p, req) }, + transactionFailReason: "[concurrent query failed] ]: cannot reach 127.0.0.1:\n", }, { cfg: goodCfg, @@ -705,6 +762,17 @@ func TestReverseProxy_ServeHTTP1(t *testing.T) { return makeHeavyRequest(p, time.Millisecond*200) }, }, + { + cfg: goodCfgWithCacheAndMaxErrorReasonSize, + name: "max error reason size", + expResponse: badGatewayResponse, + expStatusCode: http.StatusBadGateway, + f: func(p *reverseProxy) *http.Response { + req := httptest.NewRequest("GET", fmt.Sprintf("%s/badGateway?query=%s", fakeServer.URL, query), nil) + return makeCustomRequest(p, req) + }, + transactionFailReason: "unknown error reason", + }, } for _, tc := range testCases { @@ -717,6 +785,9 @@ func TestReverseProxy_ServeHTTP1(t *testing.T) { resp := tc.f(proxy) b := bbToString(t, resp.Body) resp.Body.Close() + if len(tc.cfg.Caches) != 0 { + compareTransactionFailReason(t, proxy, tc.cfg.Clusters[0].ClusterUsers[0], query, tc.transactionFailReason) + } if !strings.Contains(b, tc.expResponse) { t.Fatalf("expected response: %q; got: %q for %q", tc.expResponse, b, tc.name) } @@ -972,6 +1043,7 @@ var ( } if r.URL.Path == "/badGateway" { w.WriteHeader(http.StatusBadGateway) + fmt.Fprintln(w, badGatewayResponse) return }