Skip to content

Commit

Permalink
fix: uneven load among clickhouse shards caused by retry error mechan…
Browse files Browse the repository at this point in the history
…ism (#357)

* fix uneven load among clickhouse shards caused by retry error mechanism

* CR changes 
1. add magic number penaltySize for test assert the number of error host's load + penalty result
2. update comments
  • Loading branch information
nir3c authored Sep 1, 2023
1 parent 6fb8251 commit 7bdfdb5
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 33 deletions.
18 changes: 12 additions & 6 deletions proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,17 +238,23 @@ func executeWithRetry(
if rw.StatusCode() == http.StatusBadGateway {
log.Debugf("the invalid host is: %s", s.host.addr)
s.host.penalize()
// comment s.host.dec() line to avoid double increment; issue #322
// s.host.dec()
atomic.StoreUint32(&s.host.active, uint32(0))
newHost := s.host.replica.cluster.getHost()
nextHost := s.host.replica.cluster.getHost()
// The query could be retried if it has no stickiness to a certain server
if numRetry < maxRetry && newHost.isActive() && s.sessionId == "" {
if numRetry < maxRetry && nextHost.isActive() && s.sessionId == "" {
// the query execution has been failed
monitorRetryRequestInc(s.labels)

currentHost := s.host

// decrement the current failed host counter and increment the new host
// as for the end of the requests we will close the scope and in that closed scope
// decrement the new host PR - https://github.com/ContentSquare/chproxy/pull/357
if currentHost != nextHost {
currentHost.dec()
nextHost.inc()
}
// update host
s.host = newHost
s.host = nextHost

req.URL.Host = s.host.addr.Host
req.URL.Scheme = s.host.addr.Scheme
Expand Down
58 changes: 31 additions & 27 deletions proxyretry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package main

import (
"context"
"fmt"
"io/ioutil"
"net/http"
"net/http/httptest"
Expand Down Expand Up @@ -140,6 +139,8 @@ func TestQueryWithRetrySuccess(t *testing.T) {

retryNum := 1

erroredHost := s.host

_, err := executeWithRetry(
context.Background(),
s,
Expand All @@ -154,7 +155,17 @@ func TestQueryWithRetrySuccess(t *testing.T) {
if err != nil {
t.Errorf("The execution with retry failed, %v", err)
}
assert.Equal(t, srw.statusCode, 200)
assert.Equal(t, 200, srw.statusCode)
assert.Equal(t, 1, int(s.host.counter.load()))
assert.Equal(t, 0, int(s.host.penalty))
// should be counter + penalty
assert.Equal(t, 1, int(s.host.load()))

assert.Equal(t, 0, int(erroredHost.counter.load()))
assert.Equal(t, penaltySize, int(erroredHost.penalty))
// should be counter + penalty
assert.Equal(t, penaltySize, int(erroredHost.load()))

assert.Equal(t, mhs.hs, mhs.hst)
}

Expand Down Expand Up @@ -187,55 +198,48 @@ func newRequest(host, body string) *http.Request {
return req
}

func newHostsCluster(hs []string) ([]*host, *cluster) {
func newHostsCluster(hs []string) *cluster {
// set up cluster, replicas, hosts
cluster1 := &cluster{
name: "cluster1",
}

var urls []*url.URL
var hosts []*host

var replicas []*replica
replica1 := &replica{
cluster: cluster1,
name: "replica1",
nextHostIdx: 0,
}

var hosts []*host
cluster1.replicas = []*replica{replica1}

for i := 0; i < len(hs); i++ {
urli := &url.URL{
url1 := &url.URL{
Scheme: "http",
Host: hs[i],
}
replicai := &replica{
cluster: cluster1,
name: fmt.Sprintf("replica%d", i+1),
nextHostIdx: 0,
}
urls = append(urls, urli)
replicas = append(replicas, replicai)
}

cluster1.replicas = replicas

for i := 0; i < len(hs); i++ {
hosti := &host{
replica: replicas[i],
penalty: 1000,
replica: replica1,
penalty: 0,
active: 1,
addr: urls[i],
addr: url1,
}
hosts = append(hosts, hosti)
}
replica1.hosts = hosts

replicas[0].hosts = hosts

return hosts, cluster1
return cluster1
}

func newMockScope(hs []string) *scope {
hosts, c := newHostsCluster(hs)
c := newHostsCluster(hs)
scopedHost := c.replicas[0].hosts[0]
scopedHost.inc()

return &scope{
startTime: time.Now(),
host: hosts[0],
host: scopedHost,
cluster: c,
labels: prometheus.Labels{
"user": "default",
Expand Down

0 comments on commit 7bdfdb5

Please sign in to comment.