diff --git a/docs/examples/chashsubset/deployment.yaml b/docs/examples/chashsubset/deployment.yaml new file mode 100644 index 0000000000..5db8d3b375 --- /dev/null +++ b/docs/examples/chashsubset/deployment.yaml @@ -0,0 +1,70 @@ +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: nginx-deployment + labels: + app: nginxhello +spec: + replicas: 10 + selector: + matchLabels: + app: nginxhello + template: + metadata: + labels: + app: nginxhello + spec: + containers: + - name: nginxhello + image: gcr.io/kubernetes-e2e-test-images/echoserver:2.2 + ports: + - containerPort: 8080 + env: + - name: NODE_NAME + valueFrom: + fieldRef: + fieldPath: spec.nodeName + - name: POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name + - name: POD_NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace + - name: POD_IP + valueFrom: + fieldRef: + fieldPath: status.podIP + +--- +kind: Service +apiVersion: v1 +metadata: + name: nginxhello + labels: + app: nginxhello +spec: + selector: + app: nginxhello + ports: + - name: http + port: 80 + targetPort: 8080 + +--- +apiVersion: extensions/v1beta1 +kind: Ingress +metadata: + annotations: + nginx.ingress.kubernetes.io/upstream-hash-by: "$arg_predictorid" + nginx.ingress.kubernetes.io/upstream-hash-by-subset: "true" + nginx.ingress.kubernetes.io/upstream-hash-by-subset-size: "3" + name: nginxhello-ingress + namespace: default +spec: + backend: + serviceName: nginxhello + servicePort: 80 + diff --git a/docs/user-guide/nginx-configuration/annotations.md b/docs/user-guide/nginx-configuration/annotations.md index 638cfed990..1cbddfcc7c 100644 --- a/docs/user-guide/nginx-configuration/annotations.md +++ b/docs/user-guide/nginx-configuration/annotations.md @@ -186,10 +186,16 @@ nginx.ingress.kubernetes.io/auth-realm: "realm string" NGINX supports load balancing by client-server mapping based on [consistent hashing](http://nginx.org/en/docs/http/ngx_http_upstream_module.html#hash) for a given key. The key can contain text, variables or any combination thereof. This feature allows for request stickiness other than client IP or cookies. The [ketama](http://www.last.fm/user/RJ/journal/2007/04/10/392555/) consistent hashing method will be used which ensures only a few keys would be remapped to different servers on upstream group changes. +There is a special mode of upstream hashing called subset. In this mode, upstream servers are grouped into subsets, and stickiness works by mapping keys to a subset instead of individual upstream servers. Specific server is chosen uniformly at random from the selected sticky subset. It provides a balance between stickiness and load distribution. + To enable consistent hashing for a backend: `nginx.ingress.kubernetes.io/upstream-hash-by`: the nginx variable, text value or any combination thereof to use for consistent hashing. For example `nginx.ingress.kubernetes.io/upstream-hash-by: "$request_uri"` to consistently hash upstream requests by the current request URI. +"subset" hashing can be enabled setting `nginx.ingress.kubernetes.io/upstream-hash-by-subset`: "true". This maps requests to subset of nodes instead of a single one. `upstream-hash-by-subset-size` determines the size of each subset (default 3). + +Please check the [chashsubset](../../examples/chashsubset/deployment.yaml) example. + ### Custom NGINX load balancing This is similar to [`load-balance` in ConfigMap](./configmap.md#load-balance), but configures load balancing algorithm per ingress. diff --git a/internal/ingress/annotations/annotations.go b/internal/ingress/annotations/annotations.go index 0ada5d6a6a..3c54e5a579 100644 --- a/internal/ingress/annotations/annotations.go +++ b/internal/ingress/annotations/annotations.go @@ -92,7 +92,7 @@ type Ingress struct { SessionAffinity sessionaffinity.Config SSLPassthrough bool UsePortInRedirects bool - UpstreamHashBy string + UpstreamHashBy upstreamhashby.Config LoadBalancing string UpstreamVhost string Whitelist ipwhitelist.SourceRange diff --git a/internal/ingress/annotations/annotations_test.go b/internal/ingress/annotations/annotations_test.go index 4f62c884de..37ca1dd1a5 100644 --- a/internal/ingress/annotations/annotations_test.go +++ b/internal/ingress/annotations/annotations_test.go @@ -186,7 +186,7 @@ func TestUpstreamHashBy(t *testing.T) { for _, foo := range fooAnns { ing.SetAnnotations(foo.annotations) - r := ec.Extract(ing).UpstreamHashBy + r := ec.Extract(ing).UpstreamHashBy.UpstreamHashBy if r != foo.er { t.Errorf("Returned %v but expected %v", r, foo.er) } diff --git a/internal/ingress/annotations/upstreamhashby/main.go b/internal/ingress/annotations/upstreamhashby/main.go index bfeb5afa58..78f0a9370f 100644 --- a/internal/ingress/annotations/upstreamhashby/main.go +++ b/internal/ingress/annotations/upstreamhashby/main.go @@ -27,14 +27,27 @@ type upstreamhashby struct { r resolver.Resolver } -// NewParser creates a new CORS annotation parser +// Config contains the Consistent hash configuration to be used in the Ingress +type Config struct { + UpstreamHashBy string `json:"upstream-hash-by,omitempty"` + UpstreamHashBySubset bool `json:"upstream-hash-by-subset,omitempty"` + UpstreamHashBySubsetSize int `json:"upstream-hash-by-subset-size,omitempty"` +} + +// NewParser creates a new UpstreamHashBy annotation parser func NewParser(r resolver.Resolver) parser.IngressAnnotation { return upstreamhashby{r} } // Parse parses the annotations contained in the ingress rule -// used to indicate if the location/s contains a fragment of -// configuration to be included inside the paths of the rules func (a upstreamhashby) Parse(ing *extensions.Ingress) (interface{}, error) { - return parser.GetStringAnnotation("upstream-hash-by", ing) + upstreamHashBy, _ := parser.GetStringAnnotation("upstream-hash-by", ing) + upstreamHashBySubset, _ := parser.GetBoolAnnotation("upstream-hash-by-subset", ing) + upstreamHashbySubsetSize, _ := parser.GetIntAnnotation("upstream-hash-by-subset-size", ing) + + if upstreamHashbySubsetSize == 0 { + upstreamHashbySubsetSize = 3 + } + + return &Config{upstreamHashBy, upstreamHashBySubset, upstreamHashbySubsetSize}, nil } diff --git a/internal/ingress/annotations/upstreamhashby/main_test.go b/internal/ingress/annotations/upstreamhashby/main_test.go index 0f4cee5186..b6c0804fb0 100644 --- a/internal/ingress/annotations/upstreamhashby/main_test.go +++ b/internal/ingress/annotations/upstreamhashby/main_test.go @@ -55,7 +55,12 @@ func TestParse(t *testing.T) { for _, testCase := range testCases { ing.SetAnnotations(testCase.annotations) result, _ := ap.Parse(ing) - if result != testCase.expected { + uc, ok := result.(*Config) + if !ok { + t.Fatalf("expected a Config type") + } + + if uc.UpstreamHashBy != testCase.expected { t.Errorf("expected %v but returned %v, annotations: %s", testCase.expected, result, testCase.annotations) } } diff --git a/internal/ingress/controller/controller.go b/internal/ingress/controller/controller.go index c80456dd56..79422334d5 100644 --- a/internal/ingress/controller/controller.go +++ b/internal/ingress/controller/controller.go @@ -661,9 +661,13 @@ func (n *NGINXController) createUpstreams(data []*ingress.Ingress, du *ingress.B if upstreams[defBackend].SecureCACert.Secret == "" { upstreams[defBackend].SecureCACert = anns.SecureUpstream.CACert } - if upstreams[defBackend].UpstreamHashBy == "" { - upstreams[defBackend].UpstreamHashBy = anns.UpstreamHashBy + + if upstreams[defBackend].UpstreamHashBy.UpstreamHashBy == "" { + upstreams[defBackend].UpstreamHashBy.UpstreamHashBy = anns.UpstreamHashBy.UpstreamHashBy + upstreams[defBackend].UpstreamHashBy.UpstreamHashBySubset = anns.UpstreamHashBy.UpstreamHashBySubset + upstreams[defBackend].UpstreamHashBy.UpstreamHashBySubsetSize = anns.UpstreamHashBy.UpstreamHashBySubsetSize } + if upstreams[defBackend].LoadBalancing == "" { upstreams[defBackend].LoadBalancing = anns.LoadBalancing } @@ -725,8 +729,10 @@ func (n *NGINXController) createUpstreams(data []*ingress.Ingress, du *ingress.B upstreams[name].SecureCACert = anns.SecureUpstream.CACert } - if upstreams[name].UpstreamHashBy == "" { - upstreams[name].UpstreamHashBy = anns.UpstreamHashBy + if upstreams[name].UpstreamHashBy.UpstreamHashBy == "" { + upstreams[name].UpstreamHashBy.UpstreamHashBy = anns.UpstreamHashBy.UpstreamHashBy + upstreams[name].UpstreamHashBy.UpstreamHashBySubset = anns.UpstreamHashBy.UpstreamHashBySubset + upstreams[name].UpstreamHashBy.UpstreamHashBySubsetSize = anns.UpstreamHashBy.UpstreamHashBySubsetSize } if upstreams[name].LoadBalancing == "" { diff --git a/internal/ingress/defaults/main.go b/internal/ingress/defaults/main.go index dc135bb4c7..28bf659799 100644 --- a/internal/ingress/defaults/main.go +++ b/internal/ingress/defaults/main.go @@ -113,6 +113,14 @@ type Backend struct { // http://nginx.org/en/docs/http/ngx_http_upstream_module.html#hash UpstreamHashBy string `json:"upstream-hash-by"` + // Consistent hashing subset flag. + // Default: false + UpstreamHashBySubset bool `json:"upstream-hash-by-subset"` + + // Subset consistent hashing, subset size. + // Default 3 + UpstreamHashBySubsetSize int `json:"upstream-hash-by-subset-size"` + // Let's us choose a load balancing algorithm per ingress LoadBalancing string `json:"load-balance"` diff --git a/internal/ingress/types.go b/internal/ingress/types.go index 14f1dfd24e..01aa1952ae 100644 --- a/internal/ingress/types.go +++ b/internal/ingress/types.go @@ -93,7 +93,7 @@ type Backend struct { // StickySessionAffinitySession contains the StickyConfig object with stickyness configuration SessionAffinity SessionAffinityConfig `json:"sessionAffinityConfig"` // Consistent hashing by NGINX variable - UpstreamHashBy string `json:"upstream-hash-by,omitempty"` + UpstreamHashBy UpstreamHashByConfig `json:"upstreamHashByConfig,omitempty"` // LB algorithm configuration per ingress LoadBalancing string `json:"load-balance,omitempty"` // Denotes if a backend has no server. The backend instead shares a server with another backend and acts as an @@ -150,6 +150,13 @@ type CookieSessionAffinity struct { Path string `json:"path,omitempty"` } +// UpstreamHashByConfig described setting from the upstream-hash-by* annotations. +type UpstreamHashByConfig struct { + UpstreamHashBy string `json:"upstream-hash-by,omitempty"` + UpstreamHashBySubset bool `json:"upstream-hash-by-subset,omitempty"` + UpstreamHashBySubsetSize int `json:"upstream-hash-by-subset-size,omitempty"` +} + // Endpoint describes a kubernetes endpoint in a backend // +k8s:deepcopy-gen=true type Endpoint struct { diff --git a/internal/ingress/types_equals.go b/internal/ingress/types_equals.go index eea824aad8..e0a6211584 100644 --- a/internal/ingress/types_equals.go +++ b/internal/ingress/types_equals.go @@ -234,6 +234,27 @@ func (csa1 *CookieSessionAffinity) Equal(csa2 *CookieSessionAffinity) bool { return true } +//Equal checks the equality between UpstreamByConfig types +func (u1 *UpstreamHashByConfig) Equal(u2 *UpstreamHashByConfig) bool { + if u1 == u2 { + return true + } + if u1 == nil || u2 == nil { + return false + } + if u1.UpstreamHashBy != u2.UpstreamHashBy { + return false + } + if u1.UpstreamHashBySubset != u2.UpstreamHashBySubset { + return false + } + if u1.UpstreamHashBySubsetSize != u2.UpstreamHashBySubsetSize { + return false + } + + return true +} + // Equal checks the equality against an Endpoint func (e1 *Endpoint) Equal(e2 *Endpoint) bool { if e1 == e2 { diff --git a/rootfs/etc/nginx/lua/balancer.lua b/rootfs/etc/nginx/lua/balancer.lua index 4fcf035ff6..e12c9253e6 100644 --- a/rootfs/etc/nginx/lua/balancer.lua +++ b/rootfs/etc/nginx/lua/balancer.lua @@ -5,6 +5,7 @@ local dns_util = require("util.dns") local configuration = require("configuration") local round_robin = require("balancer.round_robin") local chash = require("balancer.chash") +local chashsubset = require("balancer.chashsubset") local sticky = require("balancer.sticky") local ewma = require("balancer.ewma") @@ -17,6 +18,7 @@ local DEFAULT_LB_ALG = "round_robin" local IMPLEMENTATIONS = { round_robin = round_robin, chash = chash, + chashsubset = chashsubset, sticky = sticky, ewma = ewma, } @@ -29,8 +31,12 @@ local function get_implementation(backend) if backend["sessionAffinityConfig"] and backend["sessionAffinityConfig"]["name"] == "cookie" then name = "sticky" - elseif backend["upstream-hash-by"] then - name = "chash" + elseif backend["upstreamHashByConfig"] and backend["upstreamHashByConfig"]["upstream-hash-by"] then + if backend["upstreamHashByConfig"]["upstream-hash-by-subset"] then + name = "chashsubset" + else + name = "chash" + end end local implementation = IMPLEMENTATIONS[name] diff --git a/rootfs/etc/nginx/lua/balancer/chash.lua b/rootfs/etc/nginx/lua/balancer/chash.lua index ffd9f54e01..3eddab9e58 100644 --- a/rootfs/etc/nginx/lua/balancer/chash.lua +++ b/rootfs/etc/nginx/lua/balancer/chash.lua @@ -8,7 +8,7 @@ function _M.new(self, backend) local nodes = util.get_nodes(backend.endpoints) local o = { instance = self.factory:new(nodes), - hash_by = backend["upstream-hash-by"], + hash_by = backend["upstreamHashByConfig"]["upstream-hash-by"], traffic_shaping_policy = backend.trafficShapingPolicy, alternative_backends = backend.alternativeBackends, } diff --git a/rootfs/etc/nginx/lua/balancer/chashsubset.lua b/rootfs/etc/nginx/lua/balancer/chashsubset.lua new file mode 100644 index 0000000000..9599378d62 --- /dev/null +++ b/rootfs/etc/nginx/lua/balancer/chashsubset.lua @@ -0,0 +1,84 @@ +-- Consistent hashing to a subset of nodes. Instead of returning the same node +-- always, we return the same subset always. + +local resty_chash = require("resty.chash") +local util = require("util") + +local _M = { name = "chashsubset" } + +local function build_subset_map(backend) + local endpoints = {} + local subset_map = {} + local subsets = {} + local subset_size = backend["upstreamHashByConfig"]["upstream-hash-by-subset-size"] + + for _, endpoint in pairs(backend.endpoints) do + table.insert(endpoints, endpoint) + end + + local set_count = math.ceil(#endpoints/subset_size) + local node_count = set_count * subset_size + + -- if we don't have enough endpoints, we reuse endpoints in the last set to + -- keep the same number on all of them. + local j = 1 + for _ = #endpoints+1, node_count do + table.insert(endpoints, endpoints[j]) + j = j+1 + end + + local k = 1 + for i = 1, set_count do + local subset = {} + local subset_id = "set" .. tostring(i) + for _ = 1, subset_size do + table.insert(subset, endpoints[k]) + k = k+1 + end + subsets[subset_id] = subset + subset_map[subset_id] = 1 + end + + return subset_map, subsets +end + +function _M.new(self, backend) + local subset_map, subsets = build_subset_map(backend) + + local o = { + instance = resty_chash:new(subset_map), + hash_by = backend["upstreamHashByConfig"]["upstream-hash-by"], + subsets = subsets, + current_endpoints = backend.endpoints + } + setmetatable(o, self) + self.__index = self + return o +end + +function _M.balance(self) + local key = util.lua_ngx_var(self.hash_by) + local subset_id = self.instance:find(key) + local endpoints = self.subsets[subset_id] + local endpoint = endpoints[math.random(#endpoints)] + return endpoint.address .. ":" .. endpoint.port +end + +function _M.sync(self, backend) + local subset_map + + local changed = not util.deep_compare(self.current_endpoints, backend.endpoints) + if not changed then + return + end + + self.current_endpoints = backend.endpoints + + subset_map, self.subsets = build_subset_map(backend) + + self.instance:reinit(subset_map) + + return +end + +return _M diff --git a/rootfs/etc/nginx/lua/test/balancer/chash_test.lua b/rootfs/etc/nginx/lua/test/balancer/chash_test.lua index 8cdcef1bce..d71dfc56ab 100644 --- a/rootfs/etc/nginx/lua/test/balancer/chash_test.lua +++ b/rootfs/etc/nginx/lua/test/balancer/chash_test.lua @@ -16,7 +16,7 @@ describe("Balancer chash", function() end local backend = { - name = "my-dummy-backend", ["upstream-hash-by"] = "$request_uri", + name = "my-dummy-backend", upstreamHashByConfig = { ["upstream-hash-by"] = "$request_uri" }, endpoints = { { address = "10.184.7.40", port = "8080", maxFails = 0, failTimeout = 0 } } } local instance = balancer_chash:new(backend) diff --git a/rootfs/etc/nginx/lua/test/balancer/chashsubset_test.lua b/rootfs/etc/nginx/lua/test/balancer/chashsubset_test.lua new file mode 100644 index 0000000000..6bbd582dd3 --- /dev/null +++ b/rootfs/etc/nginx/lua/test/balancer/chashsubset_test.lua @@ -0,0 +1,82 @@ + +local function get_test_backend(n_endpoints) + local backend = { + name = "my-dummy-backend", + ["upstreamHashByConfig"] = { + ["upstream-hash-by"] = "$request_uri", + ["upstream-hash-by-subset"] = true, + ["upstream-hash-by-subset-size"] = 3 + }, + endpoints = {} + } + + for i = 1, n_endpoints do + backend.endpoints[i] = { address = "10.184.7." .. tostring(i), port = "8080", maxFails = 0, failTimeout = 0 } + end + + return backend +end + +describe("Balancer chash subset", function() + local balancer_chashsubset = require("balancer.chashsubset") + + describe("balance()", function() + it("returns peers from the same subset", function() + _G.ngx = { var = { request_uri = "/alma/armud" }} + + local backend = get_test_backend(9) + + local instance = balancer_chashsubset:new(backend) + + instance:sync(backend) + + local first_node = instance:balance() + local subset_id + local endpoint_strings + + local function has_value (tab, val) + for _, value in ipairs(tab) do + if value == val then + return true + end + end + + return false + end + + for id, endpoints in pairs(instance["subsets"]) do + endpoint_strings = {} + for _, endpoint in pairs(endpoints) do + local endpoint_string = endpoint.address .. ":" .. endpoint.port + table.insert(endpoint_strings, endpoint_string) + if first_node == endpoint_string then + -- found the set of first_node + subset_id = id + end + end + if subset_id then + break + end + end + + -- multiple calls to balance must return nodes from the same subset + for i = 0, 10 do + assert.True(has_value(endpoint_strings, instance:balance())) + end + end) + end) + describe("new(backend)", function() + it("fills last subset correctly", function() + _G.ngx = { var = { request_uri = "/alma/armud" }} + + local backend = get_test_backend(7) + + local instance = balancer_chashsubset:new(backend) + + instance:sync(backend) + for id, endpoints in pairs(instance["subsets"]) do + assert.are.equal(#endpoints, 3) + end + end) + end) +end) diff --git a/rootfs/etc/nginx/lua/test/balancer_test.lua b/rootfs/etc/nginx/lua/test/balancer_test.lua index 879d5ed8f0..89f068b29c 100644 --- a/rootfs/etc/nginx/lua/test/balancer_test.lua +++ b/rootfs/etc/nginx/lua/test/balancer_test.lua @@ -32,7 +32,10 @@ local function reset_backends() sessionAffinityConfig = { name = "", cookieSessionAffinity = { name = "", hash = "" } }, }, { name = "my-dummy-app-1", ["load-balance"] = "round_robin", }, - { name = "my-dummy-app-2", ["load-balance"] = "round_robin", ["upstream-hash-by"] = "$request_uri", }, + { + name = "my-dummy-app-2", ["load-balance"] = "chash", + upstreamHashByConfig = { ["upstream-hash-by"] = "$request_uri", }, + }, { name = "my-dummy-app-3", ["load-balance"] = "ewma", sessionAffinityConfig = { name = "cookie", cookieSessionAffinity = { name = "route", hash = "sha1" } } diff --git a/test/e2e/annotations/upstreamhashby.go b/test/e2e/annotations/upstreamhashby.go new file mode 100644 index 0000000000..b893c5d33f --- /dev/null +++ b/test/e2e/annotations/upstreamhashby.go @@ -0,0 +1,106 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package annotations + +import ( + "fmt" + "net/http" + "regexp" + "strings" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + "github.com/parnurzeal/gorequest" + + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/ingress-nginx/test/e2e/framework" +) + +func startIngress(f *framework.Framework, annotations *map[string]string) map[string]bool { + host := "upstream-hash-by.foo.com" + + ing := framework.NewSingleIngress(host, "/", host, f.IngressController.Namespace, "http-svc", 80, annotations) + f.EnsureIngress(ing) + f.WaitForNginxServer(host, + func(server string) bool { + return strings.Contains(server, fmt.Sprintf("server_name %s ;", host)) + }) + + err := wait.PollImmediate(framework.Poll, framework.DefaultTimeout, func() (bool, error) { + resp, _, _ := gorequest.New(). + Get(f.IngressController.HTTPURL). + Set("Host", host). + End() + if resp.StatusCode == http.StatusOK { + return true, nil + } + return false, nil + }) + Expect(err).Should(BeNil()) + + re, _ := regexp.Compile(`Hostname: http-svc.*`) + podMap := map[string]bool{} + + for i := 0; i < 100; i++ { + _, body, errs := gorequest.New(). + Get(f.IngressController.HTTPURL). + Set("Host", host). + End() + + Expect(errs).Should(BeEmpty()) + + podName := re.FindString(body) + Expect(podName).ShouldNot(Equal("")) + podMap[podName] = true + + } + + return podMap +} + +var _ = framework.IngressNginxDescribe("Annotations - UpstreamHashBy", func() { + f := framework.NewDefaultFramework("upstream-hash-by") + + BeforeEach(func() { + f.NewEchoDeploymentWithReplicas(6) + }) + + AfterEach(func() { + }) + + It("should connect to the same pod", func() { + annotations := map[string]string{ + "nginx.ingress.kubernetes.io/upstream-hash-by": "$request_uri", + } + + podMap := startIngress(f, &annotations) + Expect(len(podMap)).Should(Equal(1)) + + }) + + It("should connect to the same subset of pods", func() { + annotations := map[string]string{ + "nginx.ingress.kubernetes.io/upstream-hash-by": "$request_uri", + "nginx.ingress.kubernetes.io/upstream-hash-by-subset": "true", + "nginx.ingress.kubernetes.io/upstream-hash-by-subset-size": "3", + } + + podMap := startIngress(f, &annotations) + Expect(len(podMap)).Should(Equal(3)) + + }) +})