Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

max_error_reason_size limit error reason #386

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions config/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ param_groups:
# Named network lists
network_groups: <network_groups_config> ... [optional]

# Maximum total size of fail reason of queries. Config prevents large tmp files from being read into memory, affects only cachable queries
# The default value is set to 1 Petabyte.
# If error reason exceeds limit "unknown error reason" will be stored as a fail reason
max_error_reason_size: <byte_size>

# Settings for connection pool to ClickHouse
connection_pool:
max_idle_conns: 100
Expand Down
9 changes: 9 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ var (

defaultMaxPayloadSize = ByteSize(1 << 50)

defaultMaxErrorReasonSize = ByteSize(1 << 50)

defaultRetryNumber = 0
)

Expand All @@ -67,6 +69,9 @@ type Config struct {

NetworkGroups []NetworkGroups `yaml:"network_groups,omitempty"`

// Maximum size of error payload
MaxErrorReasonSize ByteSize `yaml:"max_error_reason_size,omitempty"`

Caches []Cache `yaml:"caches,omitempty"`

ParamGroups []ParamGroup `yaml:"param_groups,omitempty"`
Expand Down Expand Up @@ -188,6 +193,10 @@ func (cfg *Config) setDefaults() error {
c.setDefaults()
}

if cfg.MaxErrorReasonSize <= 0 {
cfg.MaxErrorReasonSize = defaultMaxErrorReasonSize
}

cfg.setServerMaxResponseTime(maxResponseTime)

return nil
Expand Down
10 changes: 9 additions & 1 deletion config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,8 @@ var fullConfig = Config{
},
},
},
networkReg: map[string]Networks{},
MaxErrorReasonSize: ByteSize(100 << 20),
networkReg: map[string]Networks{},
}

func TestLoadConfig(t *testing.T) {
Expand Down Expand Up @@ -319,6 +320,7 @@ func TestLoadConfig(t *testing.T) {
MaxExecutionTime: Duration(120 * time.Second),
},
},
MaxErrorReasonSize: ByteSize(1 << 50),
},
},
}
Expand Down Expand Up @@ -505,6 +507,11 @@ func TestBadConfig(t *testing.T) {
"testdata/bad.proxy_settings.yml",
"`proxy_header` cannot be set without enabling proxy settings",
},
{
"max error reason size",
"testdata/bad.max_error_reason_size.yml",
"cannot parse byte size \"-10B\": it must be positive float followed by optional units. For example, 1.5Gb, 3T",
},
}

for _, tc := range testCases {
Expand Down Expand Up @@ -880,6 +887,7 @@ network_groups:
- name: reporting-apps
networks:
- 10.10.10.0/24
max_error_reason_size: 104857600
caches:
- mode: file_system
name: longterm
Expand Down
21 changes: 21 additions & 0 deletions config/testdata/bad.max_error_reason_size.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
caches:
- name: "longterm"
mode: "file_system"
file_system:
dir: "cache_dir"
max_size: 100Gb

server:
http:
listen_addr: ":8080"

users:
- name: "dummy"
allowed_networks: ["1.2.3.4"]
to_cluster: "cluster"
to_user: "default"

clusters:
- name: "cluster"
nodes: ["127.0.1.1:8123"]
max_error_reason_size: "-10B"
2 changes: 2 additions & 0 deletions config/testdata/full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ network_groups:
- name: "reporting-apps"
networks: ["10.10.10.0/24"]

max_error_reason_size: 100Mb

# Optional lists of query params to send with each proxied request to ClickHouse.
# These lists may be used for overriding ClickHouse settings on a per-user basis.
param_groups:
Expand Down
4 changes: 4 additions & 0 deletions docs/src/content/docs/configuration/default.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ network_groups:
- name: "reporting-apps"
networks: ["10.10.10.0/24"]

# Maximum total size of fail reason of queries. Config prevents large tmp files from being read into memory, affects only cachable queries
# If error reason exceeds limit "unknown error reason" will be stored as a fail reason
max_error_reason_size: 100GB

# Optional lists of query params to send with each proxied request to ClickHouse.
# These lists may be used for overriding ClickHouse settings on a per-user basis.
param_groups:
Expand Down
17 changes: 13 additions & 4 deletions proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type reverseProxy struct {
hasWildcarded bool
maxIdleConns int
maxIdleConnsPerHost int
maxErrorReasonSize int64
}

func newReverseProxy(cfgCp *config.ConnectionPool) *reverseProxy {
Expand Down Expand Up @@ -451,12 +452,18 @@ func (rp *reverseProxy) serveFromCache(s *scope, srw *statResponseWriter, req *h
tmpFileRespWriter.WriteHeader(srw.statusCode)
}

errString, err := toString(reader)
if err != nil {
log.Errorf("%s failed to get error reason: %s", s, err.Error())
errReason := "unknown error reason"
if contentLength > rp.maxErrorReasonSize {
log.Infof("%s: Error reason length (%d) is greater than max error reason size (%d)", s, contentLength, rp.maxErrorReasonSize)
} else {
errString, err := toString(reader)
if err != nil {
log.Errorf("%s failed to get error reason: %s", s, err.Error())
}

errReason = fmt.Sprintf("%s %s", failedTransactionPrefix, errString)
}

errReason := fmt.Sprintf("%s %s", failedTransactionPrefix, errString)
rp.completeTransaction(s, statusCode, userCache, key, q, errReason)

// we need to reset the offset since the reader of tmpFileRespWriter was already
Expand Down Expand Up @@ -622,6 +629,8 @@ func (rp *reverseProxy) applyConfig(cfg *config.Config) error {
return err
}

rp.maxErrorReasonSize = int64(cfg.MaxErrorReasonSize)

caches := make(map[string]*cache.AsyncCache, len(cfg.Caches))
defer func() {
// caches is swapped with old caches from rp.caches
Expand Down
92 changes: 82 additions & 10 deletions proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"crypto/tls"
"fmt"
"hash/fnv"
"io"
"math/rand"
"net"
Expand Down Expand Up @@ -41,7 +42,10 @@ const (
okResponse = "1"
badGatewayResponse = "]: cannot reach 127.0.0.1:"
)
const testCacheDir = "./test-cache-data"
const (
testCacheDir = "./test-cache-data"
fileSystemCache = "file_system_cache"
)

var goodCfg = &config.Config{
Server: config.Server{
Expand Down Expand Up @@ -97,15 +101,56 @@ var goodCfgWithCache = &config.Config{
Name: defaultUsername,
ToCluster: "cluster",
ToUser: "web",
Cache: "file_system_cache",
Cache: fileSystemCache,
},
},
ParamGroups: []config.ParamGroup{
{Name: "param_test", Params: []config.Param{{Key: "param_key", Value: "param_value"}}},
},
Caches: []config.Cache{
{
Name: "file_system_cache",
Name: fileSystemCache,
Mode: "file_system",
FileSystem: config.FileSystemCacheConfig{
Dir: testCacheDir,
MaxSize: config.ByteSize(1024 * 1024),
},
Expire: config.Duration(1000 * 60 * 60),
},
},
MaxErrorReasonSize: config.ByteSize(100 << 20),
}
var goodCfgWithCacheAndMaxErrorReasonSize = &config.Config{
Clusters: []config.Cluster{
{
Name: "cluster",
Scheme: "http",
Replicas: []config.Replica{
{
Nodes: []string{"localhost:8123"},
},
},
ClusterUsers: []config.ClusterUser{
{
Name: "web",
},
},
},
},
Users: []config.User{
{
Name: defaultUsername,
ToCluster: "cluster",
ToUser: "web",
Cache: fileSystemCache,
},
},
ParamGroups: []config.ParamGroup{
{Name: "param_test", Params: []config.Param{{Key: "param_key", Value: "param_value"}}},
},
Caches: []config.Cache{
{
Name: fileSystemCache,
Mode: "file_system",
FileSystem: config.FileSystemCacheConfig{
Dir: testCacheDir,
Expand Down Expand Up @@ -310,21 +355,32 @@ var fullWildcardedCfg = &config.Config{
},
}

func compareTransactionFailReason(t *testing.T, p *reverseProxy, user config.ClusterUser, query string, failReason string) {
h := fnv.New32a()
h.Write([]byte(user.Name + user.Password))
transactionKey := cache.NewKey([]byte(query), url.Values{"query": []string{query}}, "", 0, 0, h.Sum32())
transactionStatus, err := p.caches[fileSystemCache].TransactionRegistry.Status(transactionKey)
assert.Nil(t, err)
assert.Equal(t, failReason, transactionStatus.FailReason)
}

func TestReverseProxy_ServeHTTP1(t *testing.T) {
query := "SELECT123456"
testCases := []struct {
cfg *config.Config
name string
expResponse string
expStatusCode int
f func(p *reverseProxy) *http.Response
cfg *config.Config
name string
expResponse string
expStatusCode int
f func(p *reverseProxy) *http.Response
transactionFailReason string
}{
{
cfg: goodCfg,
name: "Bad gatway response without cache",
expResponse: badGatewayResponse,
expStatusCode: http.StatusBadGateway,
f: func(p *reverseProxy) *http.Response {
req := httptest.NewRequest("GET", fmt.Sprintf("%s/badGateway?query=SELECT123456", fakeServer.URL), nil)
req := httptest.NewRequest("GET", fmt.Sprintf("%s/badGateway?query=%s", fakeServer.URL, query), nil)
return makeCustomRequest(p, req)
},
},
Expand All @@ -334,11 +390,12 @@ func TestReverseProxy_ServeHTTP1(t *testing.T) {
expResponse: badGatewayResponse,
expStatusCode: http.StatusBadGateway,
f: func(p *reverseProxy) *http.Response {
req := httptest.NewRequest("GET", fmt.Sprintf("%s/badGateway?query=SELECT123456", fakeServer.URL), nil)
req := httptest.NewRequest("GET", fmt.Sprintf("%s/badGateway?query=%s", fakeServer.URL, query), nil)
// cleaning the cache to be sure it will be a cache miss although the query isn't supposed to be cached
os.RemoveAll(testCacheDir)
return makeCustomRequest(p, req)
},
transactionFailReason: "[concurrent query failed] ]: cannot reach 127.0.0.1:\n",
},
{
cfg: goodCfg,
Expand Down Expand Up @@ -705,6 +762,17 @@ func TestReverseProxy_ServeHTTP1(t *testing.T) {
return makeHeavyRequest(p, time.Millisecond*200)
},
},
{
cfg: goodCfgWithCacheAndMaxErrorReasonSize,
name: "max error reason size",
expResponse: badGatewayResponse,
expStatusCode: http.StatusBadGateway,
f: func(p *reverseProxy) *http.Response {
req := httptest.NewRequest("GET", fmt.Sprintf("%s/badGateway?query=%s", fakeServer.URL, query), nil)
return makeCustomRequest(p, req)
},
transactionFailReason: "unknown error reason",
},
}

for _, tc := range testCases {
Expand All @@ -717,6 +785,9 @@ func TestReverseProxy_ServeHTTP1(t *testing.T) {
resp := tc.f(proxy)
b := bbToString(t, resp.Body)
resp.Body.Close()
if len(tc.cfg.Caches) != 0 {
compareTransactionFailReason(t, proxy, tc.cfg.Clusters[0].ClusterUsers[0], query, tc.transactionFailReason)
}
if !strings.Contains(b, tc.expResponse) {
t.Fatalf("expected response: %q; got: %q for %q", tc.expResponse, b, tc.name)
}
Expand Down Expand Up @@ -972,6 +1043,7 @@ var (
}
if r.URL.Path == "/badGateway" {
w.WriteHeader(http.StatusBadGateway)
fmt.Fprintln(w, badGatewayResponse)
return
}

Expand Down
Loading