From 73bdd517488a509040c32f68a64d41a662ca84c1 Mon Sep 17 00:00:00 2001 From: Lennard Eijsackers Date: Fri, 15 Sep 2023 12:51:25 +0200 Subject: [PATCH] chore: Refactor host in scope and seperate to new package topology (#356) * chore: Refactor host in scope and seperate to new package topology *Motivation:* Right now a lot of the code base in chproxy resides in a shared main package. This has lead to a lot of coupled code and a code base that is very hard to read. This PR is a continuation of the work started with the heartbeat to move code away from the main package, decouple the code and improve readability. *Additions and Changes:* - Create a new `Node` struct in the new `topology` package that exposes methods from the previous `host` struct. However, it doesn't expose internal state. - Improve the Node code by using more modern constructs such as `atomic.Bool`. - Update the scope package and every usage of `host` with `topology.Node`. - Include a new test case for `Node.StartHeartbeat` *Notes:* Due to the coupled nature of the code around scope, I didn't see an opportunity to do this incrementally. The PR will sadly be large and hard to review. Signed-off-by: Lennard Eijsackers * chore: Resolve comments on PR --------- Signed-off-by: Lennard Eijsackers --- internal/counter/counter.go | 15 +++ internal/topology/main_test.go | 23 ++++ internal/topology/metrics.go | 61 ++++++++++ internal/topology/node.go | 194 ++++++++++++++++++++++++++++++++ internal/topology/node_test.go | 84 ++++++++++++++ metrics.go | 23 +--- proxy.go | 38 ++++--- proxy_test.go | 4 +- proxyretry_test.go | 27 +++-- scope.go | 137 +++++------------------ scope_test.go | 198 +++++++++------------------------ 11 files changed, 500 insertions(+), 304 deletions(-) create mode 100644 internal/counter/counter.go create mode 100644 internal/topology/main_test.go create mode 100644 internal/topology/metrics.go create mode 100644 internal/topology/node.go create mode 100644 internal/topology/node_test.go diff --git a/internal/counter/counter.go b/internal/counter/counter.go new file mode 100644 index 00000000..3b8b94c1 --- /dev/null +++ b/internal/counter/counter.go @@ -0,0 +1,15 @@ +package counter + +import "sync/atomic" + +type Counter struct { + value atomic.Uint32 +} + +func (c *Counter) Store(n uint32) { c.value.Store(n) } + +func (c *Counter) Load() uint32 { return c.value.Load() } + +func (c *Counter) Dec() { c.value.Add(^uint32(0)) } + +func (c *Counter) Inc() uint32 { return c.value.Add(1) } diff --git a/internal/topology/main_test.go b/internal/topology/main_test.go new file mode 100644 index 00000000..4322a64d --- /dev/null +++ b/internal/topology/main_test.go @@ -0,0 +1,23 @@ +package topology + +import ( + "os" + "testing" + + "github.com/contentsquare/chproxy/config" +) + +func TestMain(m *testing.M) { + cfg := &config.Config{ + Server: config.Server{ + Metrics: config.Metrics{ + Namespace: "test", + }, + }, + } + + // Metrics should be preregistered to avoid nil-panics. + RegisterMetrics(cfg) + code := m.Run() + os.Exit(code) +} diff --git a/internal/topology/metrics.go b/internal/topology/metrics.go new file mode 100644 index 00000000..21000965 --- /dev/null +++ b/internal/topology/metrics.go @@ -0,0 +1,61 @@ +package topology + +// TODO this is only here to avoid recursive imports. We should have a separate package for metrics. +import ( + "github.com/contentsquare/chproxy/config" + "github.com/prometheus/client_golang/prometheus" +) + +var ( + HostHealth *prometheus.GaugeVec + HostPenalties *prometheus.CounterVec +) + +func initMetrics(cfg *config.Config) { + namespace := cfg.Server.Metrics.Namespace + HostHealth = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: namespace, + Name: "host_health", + Help: "Health state of hosts by clusters", + }, + []string{"cluster", "replica", "cluster_node"}, + ) + HostPenalties = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Name: "host_penalties_total", + Help: "Total number of given penalties by host", + }, + []string{"cluster", "replica", "cluster_node"}, + ) +} + +func RegisterMetrics(cfg *config.Config) { + initMetrics(cfg) + prometheus.MustRegister(HostHealth, HostPenalties) +} + +func reportNodeHealthMetric(clusterName, replicaName, nodeName string, active bool) { + label := prometheus.Labels{ + "cluster": clusterName, + "replica": replicaName, + "cluster_node": nodeName, + } + + if active { + HostHealth.With(label).Set(1) + } else { + HostHealth.With(label).Set(0) + } +} + +func incrementPenaltiesMetric(clusterName, replicaName, nodeName string) { + label := prometheus.Labels{ + "cluster": clusterName, + "replica": replicaName, + "cluster_node": nodeName, + } + + HostPenalties.With(label).Inc() +} diff --git a/internal/topology/node.go b/internal/topology/node.go new file mode 100644 index 00000000..8c2825f5 --- /dev/null +++ b/internal/topology/node.go @@ -0,0 +1,194 @@ +package topology + +import ( + "context" + "net/url" + "sync/atomic" + "time" + + "github.com/contentsquare/chproxy/internal/counter" + "github.com/contentsquare/chproxy/internal/heartbeat" + "github.com/contentsquare/chproxy/log" +) + +const ( + // prevents excess goroutine creating while penalizing overloaded host + DefaultPenaltySize = 5 + DefaultMaxSize = 300 + DefaultPenaltyDuration = time.Second * 10 +) + +type nodeOpts struct { + defaultActive bool + penaltySize uint32 + penaltyMaxSize uint32 + penaltyDuration time.Duration +} + +func defaultNodeOpts() nodeOpts { + return nodeOpts{ + penaltySize: DefaultPenaltySize, + penaltyMaxSize: DefaultMaxSize, + penaltyDuration: DefaultPenaltyDuration, + } +} + +type NodeOption interface { + apply(*nodeOpts) +} + +type defaultActive struct { + active bool +} + +func (o defaultActive) apply(opts *nodeOpts) { + opts.defaultActive = o.active +} + +func WithDefaultActiveState(active bool) NodeOption { + return defaultActive{ + active: active, + } +} + +type Node struct { + // Node Address. + addr *url.URL + + // Whether this node is alive. + active atomic.Bool + + // Counter of currently running connections. + connections counter.Counter + + // Counter of unsuccesfull request to decrease host priority. + penalty atomic.Uint32 + + // Heartbeat function + hb heartbeat.HeartBeat + + // TODO These fields are only used for labels in prometheus. We should have a different way to pass the labels. + // For metrics only + clusterName string + replicaName string + + // Additional configuration options + opts nodeOpts +} + +func NewNode(addr *url.URL, hb heartbeat.HeartBeat, clusterName, replicaName string, opts ...NodeOption) *Node { + nodeOpts := defaultNodeOpts() + + for _, opt := range opts { + opt.apply(&nodeOpts) + } + + n := &Node{ + addr: addr, + hb: hb, + clusterName: clusterName, + replicaName: replicaName, + opts: nodeOpts, + } + + if n.opts.defaultActive { + n.SetIsActive(true) + } + + return n +} + +func (n *Node) IsActive() bool { + return n.active.Load() +} + +func (n *Node) SetIsActive(active bool) { + n.active.Store(active) +} + +// StartHeartbeat runs the heartbeat healthcheck against the node +// until the done channel is closed. +// If the heartbeat fails, the active status of the node is changed. +func (n *Node) StartHeartbeat(done <-chan struct{}) { + ctx, cancel := context.WithCancel(context.Background()) + for { + n.heartbeat(ctx) + select { + case <-done: + cancel() + return + case <-time.After(n.hb.Interval()): + } + } +} + +func (n *Node) heartbeat(ctx context.Context) { + if err := n.hb.IsHealthy(ctx, n.addr.String()); err == nil { + n.active.Store(true) + reportNodeHealthMetric(n.clusterName, n.replicaName, n.Host(), true) + } else { + log.Errorf("error while health-checking %q host: %s", n.Host(), err) + n.active.Store(false) + reportNodeHealthMetric(n.clusterName, n.replicaName, n.Host(), false) + } +} + +// Penalize a node if a request failed to decrease it's priority. +// If the penalty is already at the maximum allowed size this function +// will not penalize the node further. +// A function will be registered to run after the penalty duration to +// increase the priority again. +func (n *Node) Penalize() { + penalty := n.penalty.Load() + if penalty >= n.opts.penaltyMaxSize { + return + } + + incrementPenaltiesMetric(n.clusterName, n.replicaName, n.Host()) + + n.penalty.Add(n.opts.penaltySize) + + time.AfterFunc(n.opts.penaltyDuration, func() { + n.penalty.Add(^uint32(n.opts.penaltySize - 1)) + }) +} + +// CurrentLoad returns the current node returns the number of open connections +// plus the penalty. +func (n *Node) CurrentLoad() uint32 { + c := n.connections.Load() + p := n.penalty.Load() + return c + p +} + +func (n *Node) CurrentConnections() uint32 { + return n.connections.Load() +} + +func (n *Node) CurrentPenalty() uint32 { + return n.penalty.Load() +} + +func (n *Node) IncrementConnections() { + n.connections.Inc() +} + +func (n *Node) DecrementConnections() { + n.connections.Dec() +} + +func (n *Node) Scheme() string { + return n.addr.Scheme +} + +func (n *Node) Host() string { + return n.addr.Host +} + +func (n *Node) ReplicaName() string { + return n.replicaName +} + +func (n *Node) String() string { + return n.addr.String() +} diff --git a/internal/topology/node_test.go b/internal/topology/node_test.go new file mode 100644 index 00000000..de10fbf5 --- /dev/null +++ b/internal/topology/node_test.go @@ -0,0 +1,84 @@ +package topology + +import ( + "context" + "errors" + "net/url" + "testing" + "time" + + "github.com/contentsquare/chproxy/internal/heartbeat" + "github.com/stretchr/testify/assert" +) + +var _ heartbeat.HeartBeat = &mockHeartbeat{} + +type mockHeartbeat struct { + interval time.Duration + err error +} + +func (hb *mockHeartbeat) Interval() time.Duration { + return hb.interval +} + +func (hb *mockHeartbeat) IsHealthy(ctx context.Context, addr string) error { + return hb.err +} + +func TestPenalize(t *testing.T) { + node := NewNode(&url.URL{Host: "127.0.0.1"}, nil, "test", "test") + expectedLoad := uint32(0) + assert.Equal(t, expectedLoad, node.CurrentLoad(), "got running queries %d; expected %d", node.CurrentLoad(), expectedLoad) + + node.Penalize() + expectedLoad = uint32(DefaultPenaltySize) + assert.Equal(t, expectedLoad, node.CurrentLoad(), "got running queries %d; expected %d", node.CurrentLoad(), expectedLoad) + + // do more penalties than `penaltyMaxSize` allows + max := int(DefaultMaxSize/DefaultPenaltySize) * 2 + for i := 0; i < max; i++ { + node.Penalize() + } + + expectedLoad = uint32(DefaultMaxSize) + assert.Equal(t, expectedLoad, node.CurrentLoad(), "got running queries %d; expected %d", node.CurrentLoad(), expectedLoad) + + // Still allow connections to increase. + node.IncrementConnections() + expectedLoad++ + assert.Equal(t, expectedLoad, node.CurrentLoad(), "got running queries %d; expected %d", node.CurrentLoad(), expectedLoad) +} + +func TestStartHeartbeat(t *testing.T) { + hb := &mockHeartbeat{ + interval: 10 * time.Millisecond, + err: nil, + } + + done := make(chan struct{}) + defer close(done) + + node := NewNode(&url.URL{Host: "127.0.0.1"}, hb, "test", "test") + + // Node is eventually active after start. + go node.StartHeartbeat(done) + + assert.Eventually(t, func() bool { + return node.IsActive() + }, time.Second, 100*time.Millisecond) + + // change heartbeat to error, node eventually becomes inactive. + hb.err = errors.New("failed connection") + + assert.Eventually(t, func() bool { + return !node.IsActive() + }, time.Second, 100*time.Millisecond) + + // If error is removed node becomes active again. + hb.err = nil + + assert.Eventually(t, func() bool { + return node.IsActive() + }, time.Second, 100*time.Millisecond) +} diff --git a/metrics.go b/metrics.go index fd4c9f96..0e5fbf3c 100644 --- a/metrics.go +++ b/metrics.go @@ -2,6 +2,7 @@ package main import ( "github.com/contentsquare/chproxy/config" + "github.com/contentsquare/chproxy/internal/topology" "github.com/prometheus/client_golang/prometheus" ) @@ -10,8 +11,6 @@ var ( requestSum *prometheus.CounterVec requestSuccess *prometheus.CounterVec limitExcess *prometheus.CounterVec - hostPenalties *prometheus.CounterVec - hostHealth *prometheus.GaugeVec concurrentQueries *prometheus.GaugeVec requestQueueSize *prometheus.GaugeVec userQueueOverflow *prometheus.CounterVec @@ -73,22 +72,6 @@ func initMetrics(cfg *config.Config) { }, []string{"user", "cluster", "cluster_user", "replica", "cluster_node"}, ) - hostPenalties = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Namespace: namespace, - Name: "host_penalties_total", - Help: "Total number of given penalties by host", - }, - []string{"cluster", "replica", "cluster_node"}, - ) - hostHealth = prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Namespace: namespace, - Name: "host_health", - Help: "Health state of hosts by clusters", - }, - []string{"cluster", "replica", "cluster_node"}, - ) concurrentQueries = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: namespace, @@ -287,9 +270,11 @@ func initMetrics(cfg *config.Config) { } func registerMetrics(cfg *config.Config) { + topology.RegisterMetrics(cfg) + initMetrics(cfg) prometheus.MustRegister(statusCodes, requestSum, requestSuccess, - limitExcess, hostPenalties, hostHealth, concurrentQueries, + limitExcess, concurrentQueries, requestQueueSize, userQueueOverflow, clusterUserQueueOverflow, requestBodyBytes, responseBodyBytes, cacheFailedInsert, cacheCorruptedFetch, cacheHit, cacheMiss, cacheSize, cacheItems, cacheSkipped, diff --git a/proxy.go b/proxy.go index aa1f33ed..a848e3be 100644 --- a/proxy.go +++ b/proxy.go @@ -14,11 +14,11 @@ import ( "strconv" "strings" "sync" - "sync/atomic" "time" "github.com/contentsquare/chproxy/cache" "github.com/contentsquare/chproxy/config" + "github.com/contentsquare/chproxy/internal/topology" "github.com/contentsquare/chproxy/log" "github.com/prometheus/client_golang/prometheus" ) @@ -166,8 +166,8 @@ func (rp *reverseProxy) ServeHTTP(rw http.ResponseWriter, req *http.Request) { "user": s.user.name, "cluster": s.cluster.name, "cluster_user": s.clusterUser.name, - "replica": s.host.replica.name, - "cluster_node": s.host.addr.Host, + "replica": s.host.ReplicaName(), + "cluster_node": s.host.Host(), "code": strconv.Itoa(srw.statusCode), }, ).Inc() @@ -236,12 +236,14 @@ func executeWithRetry( // StatusBadGateway response is returned by http.ReverseProxy when // it cannot establish connection to remote host. if rw.StatusCode() == http.StatusBadGateway { - log.Debugf("the invalid host is: %s", s.host.addr) - s.host.penalize() - atomic.StoreUint32(&s.host.active, uint32(0)) - nextHost := s.host.replica.cluster.getHost() + log.Debugf("the invalid host is: %s", s.host) + s.host.Penalize() + // comment s.host.dec() line to avoid double increment; issue #322 + // s.host.dec() + s.host.SetIsActive(false) + nextHost := s.cluster.getHost() // The query could be retried if it has no stickiness to a certain server - if numRetry < maxRetry && nextHost.isActive() && s.sessionId == "" { + if numRetry < maxRetry && nextHost.IsActive() && s.sessionId == "" { // the query execution has been failed monitorRetryRequestInc(s.labels) currentHost := s.host @@ -250,20 +252,20 @@ func executeWithRetry( // as for the end of the requests we will close the scope and in that closed scope // decrement the new host PR - https://github.com/ContentSquare/chproxy/pull/357 if currentHost != nextHost { - currentHost.dec() - nextHost.inc() + currentHost.DecrementConnections() + nextHost.IncrementConnections() } // update host s.host = nextHost - req.URL.Host = s.host.addr.Host - req.URL.Scheme = s.host.addr.Scheme - log.Debugf("the valid host is: %s", s.host.addr) + req.URL.Host = s.host.Host() + req.URL.Scheme = s.host.Scheme() + log.Debugf("the valid host is: %s", s.host) } else { since = time.Since(startTime).Seconds() monitorDuration(since) q := getQuerySnippet(req) - err1 := fmt.Errorf("%s: cannot reach %s; query: %q", s, s.host.addr.Host, q) + err1 := fmt.Errorf("%s: cannot reach %s; query: %q", s, s.host.Host(), q) respondWith(srw, err1, srw.StatusCode()) break } @@ -327,7 +329,7 @@ func (rp *reverseProxy) proxyRequest(s *scope, rw ResponseWriterWithCode, srw *s timeoutRequest.With(s.labels).Inc() // Penalize host with the timed out query, because it may be overloaded. - s.host.penalize() + s.host.Penalize() q := getQuerySnippet(req) log.Debugf("%s: query timeout in %f; query: %q", s, executeDuration, q) @@ -754,7 +756,7 @@ func (rp *reverseProxy) restartWithNewConfig(caches map[string]*cache.AsyncCache // Counters and Summary metrics are always relevant. // Gauge metrics may become irrelevant if they may freeze at non-zero // value after config reload. - hostHealth.Reset() + topology.HostHealth.Reset() cacheSize.Reset() cacheItems.Reset() @@ -763,8 +765,8 @@ func (rp *reverseProxy) restartWithNewConfig(caches map[string]*cache.AsyncCache for _, r := range c.replicas { for _, h := range r.hosts { rp.reloadWG.Add(1) - go func(h *host) { - h.runHeartbeat(rp.reloadSignal) + go func(h *topology.Node) { + h.StartHeartbeat(rp.reloadSignal) rp.reloadWG.Done() }(h) } diff --git a/proxy_test.go b/proxy_test.go index b30f0724..167352dc 100644 --- a/proxy_test.go +++ b/proxy_test.go @@ -95,8 +95,8 @@ func TestNewReverseProxy(t *testing.T) { if len(r.hosts) != 1 { t.Fatalf("got %d hosts; expResponse: %d", len(r.hosts), 1) } - if r.hosts[0].addr.Host != "localhost:8123" { - t.Fatalf("got %s host; expResponse: %s", r.hosts[0].addr.Host, "localhost:8123") + if r.hosts[0].Host() != "localhost:8123" { + t.Fatalf("got %s host; expResponse: %s", r.hosts[0].Host(), "localhost:8123") } if len(proxy.users) != 1 { t.Fatalf("got %d users; expResponse: %d", len(proxy.users), 1) diff --git a/proxyretry_test.go b/proxyretry_test.go index 3650c0da..d66057bc 100644 --- a/proxyretry_test.go +++ b/proxyretry_test.go @@ -10,6 +10,7 @@ import ( "testing" "time" + "github.com/contentsquare/chproxy/internal/topology" "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/assert" ) @@ -156,15 +157,15 @@ func TestQueryWithRetrySuccess(t *testing.T) { t.Errorf("The execution with retry failed, %v", err) } assert.Equal(t, 200, srw.statusCode) - assert.Equal(t, 1, int(s.host.counter.load())) - assert.Equal(t, 0, int(s.host.penalty)) + assert.Equal(t, 1, int(s.host.CurrentConnections())) + assert.Equal(t, 0, int(s.host.CurrentPenalty())) // should be counter + penalty - assert.Equal(t, 1, int(s.host.load())) + assert.Equal(t, 1, int(s.host.CurrentLoad())) - assert.Equal(t, 0, int(erroredHost.counter.load())) - assert.Equal(t, penaltySize, int(erroredHost.penalty)) + assert.Equal(t, 0, int(erroredHost.CurrentConnections())) + assert.Equal(t, topology.DefaultPenaltySize, int(erroredHost.CurrentPenalty())) // should be counter + penalty - assert.Equal(t, penaltySize, int(erroredHost.load())) + assert.Equal(t, topology.DefaultPenaltySize, int(erroredHost.CurrentLoad())) assert.Equal(t, mhs.hs, mhs.hst) } @@ -204,7 +205,7 @@ func newHostsCluster(hs []string) *cluster { name: "cluster1", } - var hosts []*host + var hosts []*topology.Node replica1 := &replica{ cluster: cluster1, @@ -219,14 +220,12 @@ func newHostsCluster(hs []string) *cluster { Scheme: "http", Host: hs[i], } - hosti := &host{ - replica: replica1, - penalty: 0, - active: 1, - addr: url1, - } + hosti := topology.NewNode(url1, nil, "", replica1.name) + hosti.SetIsActive(true) + hosts = append(hosts, hosti) } + replica1.hosts = hosts return cluster1 @@ -235,7 +234,7 @@ func newHostsCluster(hs []string) *cluster { func newMockScope(hs []string) *scope { c := newHostsCluster(hs) scopedHost := c.replicas[0].hosts[0] - scopedHost.inc() + scopedHost.IncrementConnections() return &scope{ startTime: time.Now(), diff --git a/scope.go b/scope.go index 51fa36a5..ae24a04b 100644 --- a/scope.go +++ b/scope.go @@ -16,6 +16,7 @@ import ( "github.com/contentsquare/chproxy/cache" "github.com/contentsquare/chproxy/config" "github.com/contentsquare/chproxy/internal/heartbeat" + "github.com/contentsquare/chproxy/internal/topology" "github.com/contentsquare/chproxy/log" "github.com/prometheus/client_golang/prometheus" "golang.org/x/time/rate" @@ -37,7 +38,7 @@ var nextScopeID = uint64(time.Now().UnixNano()) type scope struct { startTime time.Time id scopeID - host *host + host *topology.Node cluster *cluster user *user clusterUser *clusterUser @@ -82,8 +83,8 @@ func newScope(req *http.Request, u *user, c *cluster, cu *clusterUser, sessionId "user": u.name, "cluster": c.name, "cluster_user": cu.name, - "replica": h.replica.name, - "cluster_node": h.addr.Host, + "replica": h.ReplicaName(), + "cluster_node": h.Host(), }, } return s @@ -94,7 +95,7 @@ func (s *scope) String() string { s.id, s.user.name, s.user.queryCounter.load(), s.clusterUser.name, s.clusterUser.queryCounter.load(), - s.host.addr.Host, s.host.load(), + s.host.Host(), s.host.CurrentLoad(), s.remoteAddr, s.localAddr, time.Since(s.startTime).Nanoseconds()/1000.0) } @@ -179,7 +180,7 @@ func (s *scope) waitUntilAllowStart(sleep time.Duration, deadline time.Time, lab } else { time.Sleep(sleep) } - var h *host + var h *topology.Node // Choose new host, since the previous one may become obsolete // after sleeping. if s.sessionId == "" { @@ -190,8 +191,8 @@ func (s *scope) waitUntilAllowStart(sleep time.Duration, deadline time.Time, lab } s.host = h - s.labels["replica"] = h.replica.name - s.labels["cluster_node"] = h.addr.Host + s.labels["replica"] = h.ReplicaName() + s.labels["cluster_node"] = h.Host() } } @@ -239,7 +240,7 @@ func (s *scope) inc() error { return err } - s.host.inc() + s.host.IncrementConnections() concurrentQueries.With(s.labels).Inc() return nil } @@ -300,7 +301,7 @@ func (s *scope) dec() { s.user.queryCounter.dec() s.clusterUser.queryCounter.dec() - s.host.dec() + s.host.DecrementConnections() concurrentQueries.With(s.labels).Dec() } @@ -313,7 +314,7 @@ func (s *scope) killQuery() error { query := fmt.Sprintf("KILL QUERY WHERE query_id = '%s'", s.id) r := strings.NewReader(query) - addr := s.host.addr.String() + addr := s.host.String() req, err := http.NewRequest("POST", addr, r) if err != nil { return fmt.Errorf("error while creating kill query request to %s: %w", addr, err) @@ -430,8 +431,8 @@ func (s *scope) decorateRequest(req *http.Request) (*http.Request, url.Values) { req.Header.Del("X-ClickHouse-Key") // Send request to the chosen host from cluster. - req.URL.Scheme = s.host.addr.Scheme - req.URL.Host = s.host.addr.Host + req.URL.Scheme = s.host.Scheme() + req.URL.Host = s.host.Host() // Extend ua with additional info, so it may be queried // via system.query_log.http_user_agent. @@ -685,27 +686,12 @@ func newClusterUser(cu config.ClusterUser) *clusterUser { } } -type host struct { - replica *replica - - // Counter of unsuccessful requests to decrease host priority. - penalty uint32 - - // Either the current host is alive. - active uint32 - - // Host address. - addr *url.URL - - counter -} - type replica struct { cluster *cluster name string - hosts []*host + hosts []*topology.Node nextHostIdx uint32 } @@ -741,95 +727,32 @@ func newReplicas(replicasCfg []config.Replica, nodes []string, scheme string, c return replicas, nil } -func newNodes(nodes []string, scheme string, r *replica) ([]*host, error) { - hosts := make([]*host, len(nodes)) +func newNodes(nodes []string, scheme string, r *replica) ([]*topology.Node, error) { + hosts := make([]*topology.Node, len(nodes)) for i, node := range nodes { addr, err := url.Parse(fmt.Sprintf("%s://%s", scheme, node)) if err != nil { return nil, fmt.Errorf("cannot parse `node` %q with `scheme` %q: %w", node, scheme, err) } - hosts[i] = &host{ - replica: r, - addr: addr, - } + hosts[i] = topology.NewNode(addr, r.cluster.heartBeat, r.cluster.name, r.name) } return hosts, nil } -func (h *host) runHeartbeat(done <-chan struct{}) { - label := prometheus.Labels{ - "cluster": h.replica.cluster.name, - "replica": h.replica.name, - "cluster_node": h.addr.Host, - } - hb := h.replica.cluster.heartBeat - heartbeat := func() { - if err := hb.IsHealthy(context.Background(), h.addr.String()); err == nil { - atomic.StoreUint32(&h.active, uint32(1)) - hostHealth.With(label).Set(1) - } else { - log.Errorf("error while health-checking %q host: %s", h.addr.Host, err) - atomic.StoreUint32(&h.active, uint32(0)) - hostHealth.With(label).Set(0) - } - } - for { - heartbeat() - select { - case <-done: - return - case <-time.After(hb.Interval()): - } - } -} - -func (h *host) isActive() bool { return atomic.LoadUint32(&h.active) == 1 } - func (r *replica) isActive() bool { // The replica is active if at least a single host is active. for _, h := range r.hosts { - if h.isActive() { + if h.IsActive() { return true } } return false } -const ( - // prevents excess goroutine creating while penalizing overloaded host - penaltySize = 5 - penaltyMaxSize = 300 - penaltyDuration = time.Second * 10 -) - -// decrease host priority for next requests -func (h *host) penalize() { - p := atomic.LoadUint32(&h.penalty) - if p >= penaltyMaxSize { - return - } - hostPenalties.With(prometheus.Labels{ - "cluster": h.replica.cluster.name, - "replica": h.replica.name, - "cluster_node": h.addr.Host, - }).Inc() - atomic.AddUint32(&h.penalty, penaltySize) - time.AfterFunc(penaltyDuration, func() { - atomic.AddUint32(&h.penalty, ^uint32(penaltySize-1)) - }) -} - -// overload runningQueries to take penalty into consideration -func (h *host) load() uint32 { - c := h.counter.load() - p := atomic.LoadUint32(&h.penalty) - return c + p -} - func (r *replica) load() uint32 { var reqs uint32 for _, h := range r.hosts { - reqs += h.load() + reqs += h.CurrentLoad() } return reqs } @@ -972,7 +895,7 @@ func (c *cluster) getReplicaSticky(sessionId string) *replica { // getHostSticky returns host by stickiness from replica. // // Always returns non-nil. -func (r *replica) getHostSticky(sessionId string) *host { +func (r *replica) getHostSticky(sessionId string) *topology.Node { idx := atomic.AddUint32(&r.nextHostIdx, 1) n := uint32(len(r.hosts)) if n == 1 { @@ -990,12 +913,12 @@ func (r *replica) getHostSticky(sessionId string) *host { sessionId := hash(sessionId) tmpIdx = (sessionId) % n tmpHSticky := r.hosts[tmpIdx] - log.Debugf("Sticky server candidate is: %s", tmpHSticky.addr) - if !tmpHSticky.isActive() { + log.Debugf("Sticky server candidate is: %s", tmpHSticky) + if !tmpHSticky.IsActive() { log.Debugf("Sticky session server has been picked up, but it is not available") continue } - log.Debugf("Sticky session server is: %s, session_id: %d, server_idx: %d, max nodes in pool: %d", tmpHSticky.addr, sessionId, tmpIdx, n) + log.Debugf("Sticky session server is: %s, session_id: %d, server_idx: %d, max nodes in pool: %d", tmpHSticky, sessionId, tmpIdx, n) return tmpHSticky } @@ -1008,7 +931,7 @@ func (r *replica) getHostSticky(sessionId string) *host { // getHost returns least loaded + round-robin host from replica. // // Always returns non-nil. -func (r *replica) getHost() *host { +func (r *replica) getHost() *topology.Node { idx := atomic.AddUint32(&r.nextHostIdx, 1) n := uint32(len(r.hosts)) if n == 1 { @@ -1017,10 +940,10 @@ func (r *replica) getHost() *host { idx %= n h := r.hosts[idx] - reqs := h.load() + reqs := h.CurrentLoad() // Set least priority to inactive host. - if !h.isActive() { + if !h.IsActive() { reqs = ^uint32(0) } @@ -1032,10 +955,10 @@ func (r *replica) getHost() *host { for i := uint32(1); i < n; i++ { tmpIdx := (idx + i) % n tmpH := r.hosts[tmpIdx] - if !tmpH.isActive() { + if !tmpH.IsActive() { continue } - tmpReqs := tmpH.load() + tmpReqs := tmpH.CurrentLoad() if tmpReqs == 0 { return tmpH } @@ -1054,7 +977,7 @@ func (r *replica) getHost() *host { // getHostSticky returns host based on stickiness from cluster. // // Always returns non-nil. -func (c *cluster) getHostSticky(sessionId string) *host { +func (c *cluster) getHostSticky(sessionId string) *topology.Node { r := c.getReplicaSticky(sessionId) return r.getHostSticky(sessionId) } @@ -1062,7 +985,7 @@ func (c *cluster) getHostSticky(sessionId string) *host { // getHost returns least loaded + round-robin host from cluster. // // Always returns non-nil. -func (c *cluster) getHost() *host { +func (c *cluster) getHost() *topology.Node { r := c.getReplica() return r.getHost() } diff --git a/scope_test.go b/scope_test.go index 2158eaa2..597a1bfa 100644 --- a/scope_test.go +++ b/scope_test.go @@ -10,6 +10,7 @@ import ( "time" "github.com/contentsquare/chproxy/config" + "github.com/contentsquare/chproxy/internal/topology" "github.com/prometheus/client_golang/prometheus" ) @@ -20,11 +21,8 @@ var ( c = &cluster{ replicas: []*replica{ { - hosts: []*host{ - { - addr: &url.URL{Host: "127.0.0.1"}, - active: 1, - }, + hosts: []*topology.Node{ + topology.NewNode(&url.URL{Host: "127.0.0.1"}, nil, "", "", topology.WithDefaultActiveState(true)), }, }, }, @@ -60,8 +58,8 @@ func TestRunningQueries(t *testing.T) { t.Fatalf("expected runningQueries for cluster user: %d; got: %d", cuq, s.clusterUser.queryCounter.load()) } - if s.host.load() != hq { - t.Fatalf("expected runningQueries for host: %d; got: %d", hq, s.host.load()) + if s.host.CurrentLoad() != hq { + t.Fatalf("expected runningQueries for host: %d; got: %d", hq, s.host.CurrentLoad()) } } @@ -117,22 +115,10 @@ func TestGetHost(t *testing.T) { } r := c.replicas[0] r.cluster = c - r.hosts = []*host{ - { - addr: &url.URL{Host: "127.0.0.1"}, - active: 1, - replica: r, - }, - { - addr: &url.URL{Host: "127.0.0.2"}, - active: 1, - replica: r, - }, - { - addr: &url.URL{Host: "127.0.0.3"}, - active: 1, - replica: r, - }, + r.hosts = []*topology.Node{ + topology.NewNode(&url.URL{Host: "127.0.0.1"}, nil, "", r.name, topology.WithDefaultActiveState(true)), + topology.NewNode(&url.URL{Host: "127.0.0.2"}, nil, "", r.name, topology.WithDefaultActiveState(true)), + topology.NewNode(&url.URL{Host: "127.0.0.3"}, nil, "", r.name, topology.WithDefaultActiveState(true)), } // step | expected | hosts running queries @@ -147,112 +133,72 @@ func TestGetHost(t *testing.T) { // step: 1 h := c.getHost() expected := "127.0.0.2" - if h.addr.Host != expected { - t.Fatalf("got host %q; expected %q", h.addr.Host, expected) + if h.Host() != expected { + t.Fatalf("got host %q; expected %q", h.Host(), expected) } - h.inc() + h.IncrementConnections() // step: 2 h = c.getHost() expected = "127.0.0.3" - if h.addr.Host != expected { - t.Fatalf("got host %q; expected %q", h.addr.Host, expected) + if h.Host() != expected { + t.Fatalf("got host %q; expected %q", h.Host(), expected) } - h.inc() + h.IncrementConnections() // step: 3 h = c.getHost() expected = "127.0.0.1" - if h.addr.Host != expected { - t.Fatalf("got host %q; expected %q", h.addr.Host, expected) + if h.Host() != expected { + t.Fatalf("got host %q; expected %q", h.Host(), expected) } - h.inc() + h.IncrementConnections() // step: 4 h = c.getHost() expected = "127.0.0.2" - if h.addr.Host != expected { - t.Fatalf("got host %q; expected %q", h.addr.Host, expected) + if h.Host() != expected { + t.Fatalf("got host %q; expected %q", h.Host(), expected) } - h.inc() + h.IncrementConnections() // inc last host to get least-loaded 1st host - r.hosts[2].inc() + r.hosts[2].IncrementConnections() // step: 5 h = c.getHost() expected = "127.0.0.1" - if h.addr.Host != expected { - t.Fatalf("got host %q; expected %q", h.addr.Host, expected) + if h.Host() != expected { + t.Fatalf("got host %q; expected %q", h.Host(), expected) } - h.inc() + h.IncrementConnections() // penalize 2nd host h = r.hosts[1] - expRunningQueries := penaltySize + h.load() - h.penalize() - if h.load() != expRunningQueries { - t.Fatalf("got host %q running queries %d; expected %d", h.addr.Host, h.load(), expRunningQueries) + expRunningQueries := topology.DefaultPenaltySize + h.CurrentLoad() + h.Penalize() + if h.CurrentLoad() != expRunningQueries { + t.Fatalf("got host %q running queries %d; expected %d", h.Host(), h.CurrentLoad(), expRunningQueries) } // step: 6 // we got "127.0.0.1" because index it's 6th step, hence index is = 0 h = c.getHost() expected = "127.0.0.1" - if h.addr.Host != expected { - t.Fatalf("got host %q; expected %q", h.addr.Host, expected) + if h.Host() != expected { + t.Fatalf("got host %q; expected %q", h.Host(), expected) } - h.inc() + h.IncrementConnections() // step: 7 // we got "127.0.0.3"; index = 1, means to get 2nd host, but it has runningQueries=7 // so we will get next least loaded h = c.getHost() expected = "127.0.0.3" - if h.addr.Host != expected { - t.Fatalf("got host %q; expected %q", h.addr.Host, expected) - } - h.inc() -} - -func TestPenalize(t *testing.T) { - c := &cluster{name: "default"} - c.replicas = []*replica{ - { - cluster: c, - }, - } - h := &host{ - replica: c.replicas[0], - addr: &url.URL{Host: "127.0.0.1"}, - } - exp := uint32(0) - if h.load() != exp { - t.Fatalf("got running queries %d; expected %d", h.load(), exp) - } - - h.penalize() - exp = uint32(penaltySize) - if h.load() != exp { - t.Fatalf("got running queries %d; expected %d", h.load(), exp) - } - - // do more penalties than `penaltyMaxSize` allows - max := int(penaltyMaxSize/penaltySize) * 2 - for i := 0; i < max; i++ { - h.penalize() - } - exp = uint32(penaltyMaxSize) - if h.load() != exp { - t.Fatalf("got running queries %d; expected %d", h.load(), exp) - } - - // but still might increased - h.inc() - exp++ - if h.load() != exp { - t.Fatalf("got running queries %d; expected %d", h.load(), exp) + if h.Host() != expected { + t.Fatalf("got host %q; expected %q", h.Host(), expected) } + h.IncrementConnections() } func TestRunningQueriesConcurrent(t *testing.T) { @@ -273,27 +219,18 @@ func TestGetHostConcurrent(t *testing.T) { c := &cluster{ replicas: []*replica{ { - hosts: []*host{ - { - addr: &url.URL{Host: "127.0.0.1"}, - active: 1, - }, - { - addr: &url.URL{Host: "127.0.0.2"}, - active: 1, - }, - { - addr: &url.URL{Host: "127.0.0.3"}, - active: 1, - }, + hosts: []*topology.Node{ + topology.NewNode(&url.URL{Host: "127.0.0.1"}, nil, "", "", topology.WithDefaultActiveState(true)), + topology.NewNode(&url.URL{Host: "127.0.0.2"}, nil, "", "", topology.WithDefaultActiveState(true)), + topology.NewNode(&url.URL{Host: "127.0.0.3"}, nil, "", "", topology.WithDefaultActiveState(true)), }, }, }, } f := func() { h := c.getHost() - h.inc() - h.dec() + h.IncrementConnections() + h.DecrementConnections() } if err := testConcurrent(f, 1000); err != nil { t.Fatalf("concurrent test err: %s", err) @@ -410,9 +347,7 @@ func TestDecorateRequest(t *testing.T) { user: &user{ params: tc.userParams, }, - host: &host{ - addr: &url.URL{Host: "127.0.0.1"}, - }, + host: topology.NewNode(&url.URL{Host: "127.0.0.1"}, nil, "", ""), } req, _ = s.decorateRequest(req) values := req.URL.Query() @@ -447,8 +382,8 @@ func TestGetHostSticky(t *testing.T) { c := testGetCluster() for i := 0; i < 10000; i++ { sessionId := strconv.Itoa(i % 4) - if exceptedSessionHostMap[sessionId] != c.getHostSticky(sessionId).addr.Host { - t.Fatalf("getHostSticky use sessionId: %s,expected host: %s, get: %s", sessionId, exceptedSessionHostMap[sessionId], c.getHostSticky(sessionId).addr.Host) + if exceptedSessionHostMap[sessionId] != c.getHostSticky(sessionId).Host() { + t.Fatalf("getHostSticky use sessionId: %s,expected host: %s, get: %s", sessionId, exceptedSessionHostMap[sessionId], c.getHostSticky(sessionId).Host()) } } } @@ -481,7 +416,7 @@ func testConcurrentQuery(c *cluster, u *user, cu *clusterUser, concurrency int, s.dec() // return wake task's new host // same sessionId should get same host addr - return map[string]string{sessionId: s.host.addr.Host} + return map[string]string{sessionId: s.host.Host()} } for i := 0; i < concurrency; i++ { @@ -509,48 +444,23 @@ func testGetCluster() *cluster { r1 := c.replicas[0] r1.cluster = c - r1.hosts = []*host{ - { - - addr: &url.URL{Host: "127.0.0.11"}, - active: 1, - replica: r1, - }, - { - addr: &url.URL{Host: "127.0.0.22"}, - active: 1, - replica: r1, - }, + r1.hosts = []*topology.Node{ + topology.NewNode(&url.URL{Host: "127.0.0.11"}, nil, "", r1.name, topology.WithDefaultActiveState(true)), + topology.NewNode(&url.URL{Host: "127.0.0.22"}, nil, "", r1.name, topology.WithDefaultActiveState(true)), } r1.name = "replica1" r2 := c.replicas[1] r2.cluster = c - r2.hosts = []*host{ - { - addr: &url.URL{Host: "127.0.0.33"}, - active: 1, - replica: r2, - }, - { - addr: &url.URL{Host: "127.0.0.44"}, - active: 1, - replica: r2, - }, + r2.hosts = []*topology.Node{ + topology.NewNode(&url.URL{Host: "127.0.0.33"}, nil, "", r2.name, topology.WithDefaultActiveState(true)), + topology.NewNode(&url.URL{Host: "127.0.0.44"}, nil, "", r2.name, topology.WithDefaultActiveState(true)), } r2.name = "replica2" r3 := c.replicas[2] r3.cluster = c - r3.hosts = []*host{ - { - addr: &url.URL{Host: "127.0.0.55"}, - active: 1, - replica: r3, - }, - { - addr: &url.URL{Host: "127.0.0.66"}, - active: 1, - replica: r3, - }, + r3.hosts = []*topology.Node{ + topology.NewNode(&url.URL{Host: "127.0.0.55"}, nil, "", r3.name, topology.WithDefaultActiveState(true)), + topology.NewNode(&url.URL{Host: "127.0.0.66"}, nil, "", r3.name, topology.WithDefaultActiveState(true)), } r3.name = "replica3" return c