From 7bdfdb5705bd663254047f5429d8c385a2113b15 Mon Sep 17 00:00:00 2001 From: Nir Date: Fri, 1 Sep 2023 15:29:34 +0300 Subject: [PATCH] fix: uneven load among clickhouse shards caused by retry error mechanism (#357) * fix uneven load among clickhouse shards caused by retry error mechanism * CR changes 1. add magic number penaltySize for test assert the number of error host's load + penalty result 2. update comments --- proxy.go | 18 +++++++++----- proxyretry_test.go | 58 +++++++++++++++++++++++++--------------------- 2 files changed, 43 insertions(+), 33 deletions(-) diff --git a/proxy.go b/proxy.go index d93c0e03..aa1f33ed 100644 --- a/proxy.go +++ b/proxy.go @@ -238,17 +238,23 @@ func executeWithRetry( if rw.StatusCode() == http.StatusBadGateway { log.Debugf("the invalid host is: %s", s.host.addr) s.host.penalize() - // comment s.host.dec() line to avoid double increment; issue #322 - // s.host.dec() atomic.StoreUint32(&s.host.active, uint32(0)) - newHost := s.host.replica.cluster.getHost() + nextHost := s.host.replica.cluster.getHost() // The query could be retried if it has no stickiness to a certain server - if numRetry < maxRetry && newHost.isActive() && s.sessionId == "" { + if numRetry < maxRetry && nextHost.isActive() && s.sessionId == "" { // the query execution has been failed monitorRetryRequestInc(s.labels) - + currentHost := s.host + + // decrement the current failed host counter and increment the new host + // as for the end of the requests we will close the scope and in that closed scope + // decrement the new host PR - https://github.com/ContentSquare/chproxy/pull/357 + if currentHost != nextHost { + currentHost.dec() + nextHost.inc() + } // update host - s.host = newHost + s.host = nextHost req.URL.Host = s.host.addr.Host req.URL.Scheme = s.host.addr.Scheme diff --git a/proxyretry_test.go b/proxyretry_test.go index b941c87b..3650c0da 100644 --- a/proxyretry_test.go +++ b/proxyretry_test.go @@ -2,7 +2,6 @@ package main import ( "context" - "fmt" "io/ioutil" "net/http" "net/http/httptest" @@ -140,6 +139,8 @@ func TestQueryWithRetrySuccess(t *testing.T) { retryNum := 1 + erroredHost := s.host + _, err := executeWithRetry( context.Background(), s, @@ -154,7 +155,17 @@ func TestQueryWithRetrySuccess(t *testing.T) { if err != nil { t.Errorf("The execution with retry failed, %v", err) } - assert.Equal(t, srw.statusCode, 200) + assert.Equal(t, 200, srw.statusCode) + assert.Equal(t, 1, int(s.host.counter.load())) + assert.Equal(t, 0, int(s.host.penalty)) + // should be counter + penalty + assert.Equal(t, 1, int(s.host.load())) + + assert.Equal(t, 0, int(erroredHost.counter.load())) + assert.Equal(t, penaltySize, int(erroredHost.penalty)) + // should be counter + penalty + assert.Equal(t, penaltySize, int(erroredHost.load())) + assert.Equal(t, mhs.hs, mhs.hst) } @@ -187,55 +198,48 @@ func newRequest(host, body string) *http.Request { return req } -func newHostsCluster(hs []string) ([]*host, *cluster) { +func newHostsCluster(hs []string) *cluster { // set up cluster, replicas, hosts cluster1 := &cluster{ name: "cluster1", } - var urls []*url.URL + var hosts []*host - var replicas []*replica + replica1 := &replica{ + cluster: cluster1, + name: "replica1", + nextHostIdx: 0, + } - var hosts []*host + cluster1.replicas = []*replica{replica1} for i := 0; i < len(hs); i++ { - urli := &url.URL{ + url1 := &url.URL{ Scheme: "http", Host: hs[i], } - replicai := &replica{ - cluster: cluster1, - name: fmt.Sprintf("replica%d", i+1), - nextHostIdx: 0, - } - urls = append(urls, urli) - replicas = append(replicas, replicai) - } - - cluster1.replicas = replicas - - for i := 0; i < len(hs); i++ { hosti := &host{ - replica: replicas[i], - penalty: 1000, + replica: replica1, + penalty: 0, active: 1, - addr: urls[i], + addr: url1, } hosts = append(hosts, hosti) } + replica1.hosts = hosts - replicas[0].hosts = hosts - - return hosts, cluster1 + return cluster1 } func newMockScope(hs []string) *scope { - hosts, c := newHostsCluster(hs) + c := newHostsCluster(hs) + scopedHost := c.replicas[0].hosts[0] + scopedHost.inc() return &scope{ startTime: time.Now(), - host: hosts[0], + host: scopedHost, cluster: c, labels: prometheus.Labels{ "user": "default",